Merge 'db/config: Change default SSTable compressor to LZ4WithDictsCompressor' from Nikos Dragazis
`sstable_compression_user_table_options` allows configuring a node-global SSTable compression algorithm for user tables via scylla.yaml. The current default is LZ4Compressor (inherited from Cassandra). Make LZ4WithDictsCompressor the new default. Metrics from real datasets in the field have shown significant improvements in compression ratios. If the dictionary compression feature is not enabled in the cluster (e.g., during an upgrade), fall back to the `LZ4Compressor`. Once the feature is enabled, flip the default back to the dictionary compressor using with a listener callback. Fixes #26610. Closes scylladb/scylladb#26697 * github.com:scylladb/scylladb: test/cluster: Add test for default SSTable compressor db/config: Change default SSTable compressor to LZ4WithDictsCompressor db/config: Deprecate sstable_compression_dictionaries_allow_in_ddl boost/cql_query_test: Get expected compressor from config
This commit is contained in:
@@ -145,9 +145,7 @@ void cf_prop_defs::validate(const data_dictionary::database db, sstring ks_name,
|
||||
throw exceptions::configuration_exception(sstring("Missing sub-option '") + compression_parameters::SSTABLE_COMPRESSION + "' for the '" + KW_COMPRESSION + "' option.");
|
||||
}
|
||||
compression_parameters cp(*compression_options);
|
||||
cp.validate(
|
||||
compression_parameters::dicts_feature_enabled(bool(db.features().sstable_compression_dicts)),
|
||||
compression_parameters::dicts_usage_allowed(db.get_config().sstable_compression_dictionaries_allow_in_ddl()));
|
||||
cp.validate(compression_parameters::dicts_feature_enabled(bool(db.features().sstable_compression_dicts)));
|
||||
}
|
||||
|
||||
auto per_partition_rate_limit_options = get_per_partition_rate_limit_options(schema_extensions);
|
||||
|
||||
@@ -1315,15 +1315,15 @@ db::config::config(std::shared_ptr<db::extensions> exts)
|
||||
, enable_sstables_mc_format(this, "enable_sstables_mc_format", value_status::Unused, true, "Enable SSTables 'mc' format to be used as the default file format. Deprecated, please use \"sstable_format\" instead.")
|
||||
, enable_sstables_md_format(this, "enable_sstables_md_format", value_status::Unused, true, "Enable SSTables 'md' format to be used as the default file format. Deprecated, please use \"sstable_format\" instead.")
|
||||
, sstable_format(this, "sstable_format", liveness::LiveUpdate, value_status::Used, "me", "Default sstable file format", {"md", "me", "ms"})
|
||||
, sstable_compression_user_table_options(this, "sstable_compression_user_table_options", value_status::Used, compression_parameters{},
|
||||
, sstable_compression_user_table_options(this, "sstable_compression_user_table_options", value_status::Used, compression_parameters{compression_parameters::algorithm::lz4_with_dicts},
|
||||
"Server-global user table compression options. If enabled, all user tables"
|
||||
"will be compressed using the provided options, unless overridden"
|
||||
"by compression options in the table schema. The available options are:\n"
|
||||
"* sstable_compression: The compression algorithm to use. Supported values: LZ4Compressor (default), LZ4WithDictsCompressor, SnappyCompressor, DeflateCompressor, ZstdCompressor, ZstdWithDictsCompressor, '' (empty string; disables compression).\n"
|
||||
"* sstable_compression: The compression algorithm to use. Supported values: LZ4Compressor, LZ4WithDictsCompressor (default), SnappyCompressor, DeflateCompressor, ZstdCompressor, ZstdWithDictsCompressor, '' (empty string; disables compression).\n"
|
||||
"* chunk_length_in_kb: (Default: 4) The size of chunks to compress in kilobytes. Allowed values are powers of two between 1 and 128.\n"
|
||||
"* crc_check_chance: (Default: 1.0) Not implemented (option value is ignored).\n"
|
||||
"* compression_level: (Default: 3) Compression level for ZstdCompressor and ZstdWithDictsCompressor. Higher levels provide better compression ratios at the cost of speed. Allowed values are integers between 1 and 22.")
|
||||
, sstable_compression_dictionaries_allow_in_ddl(this, "sstable_compression_dictionaries_allow_in_ddl", liveness::LiveUpdate, value_status::Used, true,
|
||||
, sstable_compression_dictionaries_allow_in_ddl(this, "sstable_compression_dictionaries_allow_in_ddl", liveness::LiveUpdate, value_status::Deprecated, true,
|
||||
"Allows for configuring tables to use SSTable compression with shared dictionaries. "
|
||||
"If the option is disabled, Scylla will reject CREATE and ALTER statements which try to set dictionary-based sstable compressors.\n"
|
||||
"This is only enforced when this node validates a new DDL statement; disabling the option won't disable dictionary-based compression "
|
||||
|
||||
43
main.cc
43
main.cc
@@ -2221,12 +2221,47 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
// Semantic validation of sstable compression parameters from config.
|
||||
// Adding here (i.e., after `join_cluster`) to ensure that the
|
||||
// required SSTABLE_COMPRESSION_DICTS cluster feature has been negotiated.
|
||||
//
|
||||
// Also, if the dictionary compression feature is not enabled, use
|
||||
// LZ4Compressor as the default algorithm instead of LZ4WithDictsCompressor.
|
||||
const auto& dicts_feature_enabled = feature_service.local().sstable_compression_dicts;
|
||||
auto& sstable_compression_options = cfg->sstable_compression_user_table_options;
|
||||
|
||||
gms::feature::listener_registration reg_listener;
|
||||
|
||||
if (!sstable_compression_options.is_set() && !dicts_feature_enabled) {
|
||||
if (sstable_compression_options().get_algorithm() != compression_parameters::algorithm::lz4_with_dicts) {
|
||||
on_internal_error(startlog, "expected LZ4WithDictsCompressor as default algorithm for sstable_compression_user_table_options.");
|
||||
}
|
||||
|
||||
startlog.info("SSTABLE_COMPRESSION_DICTS feature is disabled. Overriding default SSTable compression to use LZ4Compressor instead of LZ4WithDictsCompressor.");
|
||||
compression_parameters original_params{sstable_compression_options().get_options()};
|
||||
auto params = sstable_compression_options().get_options();
|
||||
params[compression_parameters::SSTABLE_COMPRESSION] = sstring(compression_parameters::algorithm_to_name(compression_parameters::algorithm::lz4));
|
||||
smp::invoke_on_all([&sstable_compression_options, params = std::move(params)] {
|
||||
if (!sstable_compression_options.is_set()) { // guard check; in case we ever make the option live updateable
|
||||
sstable_compression_options(compression_parameters{params}, utils::config_file::config_source::None);
|
||||
}
|
||||
}).get();
|
||||
|
||||
// Register a callback to update the default compression algorithm when the feature is enabled.
|
||||
// Precondition:
|
||||
// The callback must run inside seastar::async context:
|
||||
// - If the listener fires immediately, we are running inside seastar::async already.
|
||||
// - If the listener is deferred, `feature_service::enable()` runs it inside seastar::async.
|
||||
reg_listener = feature_service.local().sstable_compression_dicts.when_enabled([&sstable_compression_options, params = std::move(original_params)] {
|
||||
startlog.info("SSTABLE_COMPRESSION_DICTS feature is now enabled. Overriding default SSTable compression to use LZ4WithDictsCompressor.");
|
||||
smp::invoke_on_all([&sstable_compression_options, params = std::move(params)] {
|
||||
if (!sstable_compression_options.is_set()) { // guard check; in case we ever make the option live updateable
|
||||
sstable_compression_options(params, utils::config_file::config_source::None);
|
||||
}
|
||||
}).get();
|
||||
});
|
||||
}
|
||||
|
||||
try {
|
||||
const auto& dicts_feature_enabled = feature_service.local().sstable_compression_dicts;
|
||||
const auto& dicts_usage_allowed = cfg->sstable_compression_dictionaries_allow_in_ddl();
|
||||
cfg->sstable_compression_user_table_options().validate(
|
||||
compression_parameters::dicts_feature_enabled(bool(dicts_feature_enabled)),
|
||||
compression_parameters::dicts_usage_allowed(dicts_usage_allowed));
|
||||
compression_parameters::dicts_feature_enabled(bool(dicts_feature_enabled)));
|
||||
} catch (const std::exception& e) {
|
||||
startlog.error("Invalid sstable_compression_user_table_options: {}", e.what());
|
||||
throw bad_configuration_error();
|
||||
|
||||
@@ -539,17 +539,13 @@ compression_parameters::compression_parameters(const std::map<sstring, sstring>&
|
||||
}
|
||||
}
|
||||
|
||||
void compression_parameters::validate(dicts_feature_enabled dicts_enabled, dicts_usage_allowed dicts_allowed) const {
|
||||
void compression_parameters::validate(dicts_feature_enabled dicts_enabled) const {
|
||||
if (_algorithm == algorithm::zstd_with_dicts || _algorithm == algorithm::lz4_with_dicts) {
|
||||
if (!dicts_enabled) {
|
||||
throw std::runtime_error(std::format("sstable_compression {} can't be used before "
|
||||
"all nodes are upgraded to a versions which supports it",
|
||||
algorithm_to_name(_algorithm)));
|
||||
}
|
||||
if (!dicts_allowed) {
|
||||
throw std::runtime_error(std::format("sstable_compression {} has been disabled by `sstable_compression_dictionaries_allow_in_ddl: false`",
|
||||
algorithm_to_name(_algorithm)));
|
||||
}
|
||||
}
|
||||
if (_chunk_length) {
|
||||
auto chunk_length = _chunk_length.value();
|
||||
|
||||
@@ -106,8 +106,7 @@ public:
|
||||
std::optional<int> zstd_compression_level() const { return _zstd_compression_level; }
|
||||
|
||||
using dicts_feature_enabled = bool_class<struct dicts_feature_enabled_tag>;
|
||||
using dicts_usage_allowed = bool_class<struct dicts_usage_allowed_tag>;
|
||||
void validate(dicts_feature_enabled, dicts_usage_allowed) const;
|
||||
void validate(dicts_feature_enabled) const;
|
||||
|
||||
std::map<sstring, sstring> get_options() const;
|
||||
|
||||
|
||||
@@ -2015,7 +2015,7 @@ SEASTAR_TEST_CASE(test_table_compression) {
|
||||
|
||||
e.execute_cql("create table tb6 (foo text PRIMARY KEY, bar text);").get();
|
||||
BOOST_REQUIRE(e.local_db().has_schema("ks", "tb6"));
|
||||
BOOST_REQUIRE(e.local_db().find_schema("ks", "tb6")->get_compressor_params().get_algorithm() == compression_parameters::algorithm::lz4);
|
||||
BOOST_REQUIRE(e.local_db().find_schema("ks", "tb6")->get_compressor_params().get_algorithm() == e.local_db().get_config().sstable_compression_user_table_options().get_algorithm());
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -4,10 +4,14 @@
|
||||
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
#
|
||||
|
||||
import os
|
||||
import time
|
||||
import pytest
|
||||
import asyncio
|
||||
import logging
|
||||
|
||||
from test.pylib.manager_client import ManagerClient
|
||||
from test.pylib.util import wait_for_cql_and_get_hosts, wait_for_feature
|
||||
from test.pylib.manager_client import ManagerClient, ScyllaVersionDescription
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
logger.setLevel(logging.INFO)
|
||||
@@ -23,25 +27,6 @@ def yaml_to_cmdline(config):
|
||||
return cmdline
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.parametrize('cfg_source', ['yaml', 'cmdline'])
|
||||
async def test_dict_compression_not_allowed(manager: ManagerClient, cfg_source: str):
|
||||
config = {
|
||||
'sstable_compression_dictionaries_allow_in_ddl': False,
|
||||
'sstable_compression_user_table_options': {
|
||||
'sstable_compression': 'ZstdWithDictsCompressor',
|
||||
'chunk_length_in_kb': 4,
|
||||
'compression_level': 10
|
||||
}
|
||||
}
|
||||
expected_error = 'Invalid sstable_compression_user_table_options: sstable_compression ZstdWithDictsCompressor has been disabled by `sstable_compression_dictionaries_allow_in_ddl: false`'
|
||||
|
||||
if cfg_source == 'yaml':
|
||||
await manager.server_add(config=config, expected_error=expected_error)
|
||||
else:
|
||||
await manager.server_add(cmdline=yaml_to_cmdline(config), expected_error=expected_error)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.parametrize('cfg_source', ['yaml', 'cmdline'])
|
||||
async def test_chunk_size_negative(manager: ManagerClient, cfg_source: str):
|
||||
@@ -105,4 +90,59 @@ async def test_crc_check_chance_out_of_bounds(manager: ManagerClient, cfg_source
|
||||
if cfg_source == 'yaml':
|
||||
await manager.server_add(config=config, expected_error=expected_error)
|
||||
else:
|
||||
await manager.server_add(cmdline=yaml_to_cmdline(config), expected_error=expected_error)
|
||||
await manager.server_add(cmdline=yaml_to_cmdline(config), expected_error=expected_error)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_default_compression_on_upgrade(manager: ManagerClient, scylla_2025_1: ScyllaVersionDescription):
|
||||
"""
|
||||
Check that the default SSTable compression algorithm is:
|
||||
* LZ4Compressor if SSTABLE_COMPRESSION_DICTS is disabled.
|
||||
* LZ4WithDictsCompressor if SSTABLE_COMPRESSION_DICTS is enabled.
|
||||
|
||||
- Start a 2-node cluster running a version where dictionary compression is not supported (2025.1).
|
||||
- Create a table. Ensure that it uses the LZ4Compressor.
|
||||
- Upgrade one node.
|
||||
- Create a second table. Ensure that it still uses the LZ4Compressor.
|
||||
- Upgrade the second node.
|
||||
- Wait for SSTABLE_COMPRESSION_DICTS to be enabled.
|
||||
- Create a third table. Ensure that it uses the new LZ4WithDictsCompressor.
|
||||
"""
|
||||
async def create_table_and_check_compression(cql, keyspace, table_name, expected_compression, context):
|
||||
"""Helper to create a table and verify its compression algorithm."""
|
||||
logger.info(f"Creating table {table_name} ({context})")
|
||||
await cql.run_async(f"CREATE TABLE {keyspace}.{table_name} (pk int PRIMARY KEY, v int)")
|
||||
|
||||
logger.info(f"Verifying that the default compression algorithm is {expected_compression}")
|
||||
result = await cql.run_async(f"SELECT compression FROM system_schema.tables WHERE keyspace_name = '{keyspace}' AND table_name = '{table_name}'")
|
||||
actual_compression = result[0].compression.get("sstable_compression")
|
||||
logger.info(f"Actual compression for {table_name}: {actual_compression}")
|
||||
|
||||
assert actual_compression == expected_compression, \
|
||||
f"Expected {expected_compression} for {table_name} ({context}), got: {actual_compression}"
|
||||
|
||||
new_exe = os.getenv("SCYLLA")
|
||||
assert new_exe
|
||||
|
||||
logger.info("Starting servers with version 2025.1")
|
||||
servers = await manager.servers_add(2, version=scylla_2025_1)
|
||||
|
||||
logger.info("Creating a test keyspace")
|
||||
cql = manager.get_cql()
|
||||
await cql.run_async("CREATE KEYSPACE test_ks WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}")
|
||||
|
||||
await create_table_and_check_compression(cql, "test_ks", "table_before_upgrade", "org.apache.cassandra.io.compress.LZ4Compressor", "before upgrade")
|
||||
|
||||
logger.info("Upgrading server 0")
|
||||
await manager.server_change_version(servers[0].server_id, new_exe)
|
||||
await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
||||
|
||||
await create_table_and_check_compression(cql, "test_ks", "table_during_upgrade", "org.apache.cassandra.io.compress.LZ4Compressor", "during upgrade")
|
||||
|
||||
logger.info("Upgrading server 1")
|
||||
await manager.server_change_version(servers[1].server_id, new_exe)
|
||||
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
||||
|
||||
logger.info("Waiting for SSTABLE_COMPRESSION_DICTS cluster feature to be enabled on all nodes")
|
||||
await asyncio.gather(*(wait_for_feature("SSTABLE_COMPRESSION_DICTS", cql, host, time.time() + 60) for host in hosts))
|
||||
|
||||
await create_table_and_check_compression(cql, "test_ks", "table_after_upgrade", "LZ4WithDictsCompressor", "after upgrade and feature enabled")
|
||||
@@ -8,14 +8,10 @@ import logging
|
||||
import pytest
|
||||
import itertools
|
||||
import time
|
||||
import contextlib
|
||||
import typing
|
||||
from test.pylib.manager_client import ManagerClient, ServerInfo
|
||||
from test.pylib.rest_client import read_barrier, ScyllaMetrics, HTTPError
|
||||
from test.pylib.rest_client import read_barrier, ScyllaMetrics
|
||||
from cassandra.cluster import ConsistencyLevel, Session as CassandraSession
|
||||
from cassandra.policies import FallthroughRetryPolicy, ConstantReconnectionPolicy
|
||||
from cassandra.protocol import ServerError
|
||||
from cassandra.query import SimpleStatement
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -494,83 +490,3 @@ async def test_sstable_compression_dictionaries_enable_writing(manager: ManagerC
|
||||
for algo in nondict_algorithms:
|
||||
assert (await get_compressor_names(algo)) == {name_prefix + f"{algo}Compressor"}
|
||||
assert (await get_compressor_names(no_compression)) == set()
|
||||
|
||||
async def test_sstable_compression_dictionaries_allow_in_ddl(manager: ManagerClient):
|
||||
"""
|
||||
Tests the sstable_compression_dictionaries_allow_in_ddl option.
|
||||
When it's disabled, ALTER and CREATE statements should not be allowed
|
||||
to configure tables to use compression dictionaries for sstables.
|
||||
"""
|
||||
# Bootstrap cluster and configure server
|
||||
logger.info("Bootstrapping cluster")
|
||||
|
||||
servers = (await manager.servers_add(1, cmdline=[
|
||||
*common_debug_cli_options,
|
||||
"--sstable-compression-dictionaries-allow-in-ddl=false",
|
||||
], auto_rack_dc="dc1"))
|
||||
|
||||
@contextlib.asynccontextmanager
|
||||
async def with_expect_server_error(msg):
|
||||
try:
|
||||
yield
|
||||
except ServerError as e:
|
||||
if e.message != msg:
|
||||
raise
|
||||
else:
|
||||
raise Exception('Expected a ServerError, got no exceptions')
|
||||
|
||||
cql, hosts = await manager.get_ready_cql(servers)
|
||||
|
||||
await cql.run_async("""
|
||||
CREATE KEYSPACE test
|
||||
WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1}
|
||||
""")
|
||||
|
||||
for new_algo in ['LZ4WithDicts', 'ZstdWithDicts']:
|
||||
logger.info(f"Tested algorithm: {new_algo}")
|
||||
table_name = f"test.{new_algo}"
|
||||
|
||||
logger.info("Check that disabled sstable_compression_dictionaries_allow_in_ddl prevents CREATE with dict compression")
|
||||
async with with_expect_server_error(f"sstable_compression {new_algo}Compressor has been disabled by `sstable_compression_dictionaries_allow_in_ddl: false`"):
|
||||
await cql.run_async(SimpleStatement(f'''
|
||||
CREATE TABLE {table_name} (pk int PRIMARY KEY, c blob)
|
||||
WITH COMPRESSION = {{'sstable_compression': '{new_algo}Compressor'}};
|
||||
''', retry_policy=FallthroughRetryPolicy()), host=hosts[0])
|
||||
|
||||
logger.info("Enable the config option")
|
||||
await live_update_config(manager, servers, 'sstable_compression_dictionaries_allow_in_ddl', "true")
|
||||
|
||||
logger.info("CREATE the table with dict compression")
|
||||
await cql.run_async(SimpleStatement(f'''
|
||||
CREATE TABLE {table_name} (pk int PRIMARY KEY, c blob)
|
||||
WITH COMPRESSION = {{'sstable_compression': '{new_algo}Compressor'}};
|
||||
''', retry_policy=FallthroughRetryPolicy()), host=hosts[0])
|
||||
|
||||
logger.info("Disable compression on the table")
|
||||
await cql.run_async(SimpleStatement(f'''
|
||||
ALTER TABLE {table_name}
|
||||
WITH COMPRESSION = {{'sstable_compression': ''}};
|
||||
''', retry_policy=FallthroughRetryPolicy()), host=hosts[0])
|
||||
|
||||
logger.info("Disable the config option again")
|
||||
await live_update_config(manager, servers, 'sstable_compression_dictionaries_allow_in_ddl', "false")
|
||||
|
||||
logger.info("Check that disabled sstable_compression_dictionaries_allow_in_ddl prevents ALTER with dict compression")
|
||||
async with with_expect_server_error(f"sstable_compression {new_algo}Compressor has been disabled by `sstable_compression_dictionaries_allow_in_ddl: false`"):
|
||||
await cql.run_async(SimpleStatement(f'''
|
||||
ALTER TABLE {table_name}
|
||||
WITH COMPRESSION = {{'sstable_compression': '{new_algo}Compressor'}};
|
||||
''', retry_policy=FallthroughRetryPolicy()), host=hosts[0])
|
||||
|
||||
logger.info("Enable the config option again")
|
||||
await live_update_config(manager, servers, 'sstable_compression_dictionaries_allow_in_ddl', "true")
|
||||
|
||||
logger.info("ALTER the table with dict compression")
|
||||
await cql.run_async(SimpleStatement(f'''
|
||||
ALTER TABLE {table_name}
|
||||
WITH COMPRESSION = {{'sstable_compression': '{new_algo}Compressor'}};
|
||||
''', retry_policy=FallthroughRetryPolicy()), host=hosts[0])
|
||||
logger.info("Enable the config option again")
|
||||
|
||||
logger.info("Disable the config option for the next test")
|
||||
await live_update_config(manager, servers, 'sstable_compression_dictionaries_allow_in_ddl', "false")
|
||||
|
||||
Reference in New Issue
Block a user