Files
scylladb/test/cluster/test_sstable_compression_config.py
Nikos Dragazis a0bf932caa test/cluster: Add test for default SSTable compressor
The previous patch made the default compressor dependent on the
SSTABLE_COMPRESSION_DICTS feature:
* LZ4Compressor if the feature is disabled
* LZ4WithDictsCompressor if the feature is enabled

Add a test to verify that the cluster uses the right default in every
case.

Signed-off-by: Nikos Dragazis <nikolaos.dragazis@scylladb.com>
2025-10-30 15:53:54 +02:00

148 lines
6.4 KiB
Python

#
# Copyright (C) 2025-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
import os
import time
import pytest
import asyncio
import logging
from test.pylib.util import wait_for_cql_and_get_hosts, wait_for_feature
from test.pylib.manager_client import ManagerClient, ScyllaVersionDescription
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
def yaml_to_cmdline(config):
cmdline = []
for k, v in config.items():
if isinstance(v, dict):
v = ','.join([f'{kk}={vv}' for kk, vv in v.items()])
cmdline.append(f'--{k.replace("_", "-")}')
cmdline.append(str(v))
return cmdline
@pytest.mark.asyncio
@pytest.mark.parametrize('cfg_source', ['yaml', 'cmdline'])
async def test_chunk_size_negative(manager: ManagerClient, cfg_source: str):
config = {
'sstable_compression_user_table_options': {
'sstable_compression': 'LZ4Compressor',
'chunk_length_in_kb': -1
}
}
expected_error = 'Invalid sstable_compression_user_table_options: Invalid negative or null for chunk_length_in_kb/chunk_length_kb'
if cfg_source == 'yaml':
await manager.server_add(config=config, expected_error=expected_error)
else:
await manager.server_add(cmdline=yaml_to_cmdline(config), expected_error=expected_error)
@pytest.mark.asyncio
@pytest.mark.parametrize('cfg_source', ['yaml', 'cmdline'])
async def test_chunk_size_beyond_max(manager: ManagerClient, cfg_source: str):
config = {
'sstable_compression_user_table_options': {
'sstable_compression': 'LZ4Compressor',
'chunk_length_in_kb': 256
}
}
expected_error = 'Invalid sstable_compression_user_table_options: chunk_length_in_kb/chunk_length_kb must be 128 or less.'
if cfg_source == 'yaml':
await manager.server_add(config=config, expected_error=expected_error)
else:
await manager.server_add(cmdline=yaml_to_cmdline(config), expected_error=expected_error)
@pytest.mark.asyncio
@pytest.mark.parametrize('cfg_source', ['yaml', 'cmdline'])
async def test_chunk_size_not_power_of_two(manager: ManagerClient, cfg_source: str):
config = {
'sstable_compression_user_table_options': {
'sstable_compression': 'LZ4Compressor',
'chunk_length_in_kb': 3
}
}
expected_error = 'Invalid sstable_compression_user_table_options: chunk_length_in_kb/chunk_length_kb must be a power of 2.'
if cfg_source == 'yaml':
await manager.server_add(config=config, expected_error=expected_error)
else:
await manager.server_add(cmdline=yaml_to_cmdline(config), expected_error=expected_error)
@pytest.mark.asyncio
@pytest.mark.parametrize('cfg_source', ['yaml', 'cmdline'])
async def test_crc_check_chance_out_of_bounds(manager: ManagerClient, cfg_source: str):
config = {
'sstable_compression_user_table_options': {
'sstable_compression': 'LZ4Compressor',
'chunk_length_in_kb': 128,
'crc_check_chance': 1.1
}
}
expected_error = 'Invalid sstable_compression_user_table_options: crc_check_chance must be between 0.0 and 1.0.'
if cfg_source == 'yaml':
await manager.server_add(config=config, expected_error=expected_error)
else:
await manager.server_add(cmdline=yaml_to_cmdline(config), expected_error=expected_error)
@pytest.mark.asyncio
async def test_default_compression_on_upgrade(manager: ManagerClient, scylla_2025_1: ScyllaVersionDescription):
"""
Check that the default SSTable compression algorithm is:
* LZ4Compressor if SSTABLE_COMPRESSION_DICTS is disabled.
* LZ4WithDictsCompressor if SSTABLE_COMPRESSION_DICTS is enabled.
- Start a 2-node cluster running a version where dictionary compression is not supported (2025.1).
- Create a table. Ensure that it uses the LZ4Compressor.
- Upgrade one node.
- Create a second table. Ensure that it still uses the LZ4Compressor.
- Upgrade the second node.
- Wait for SSTABLE_COMPRESSION_DICTS to be enabled.
- Create a third table. Ensure that it uses the new LZ4WithDictsCompressor.
"""
async def create_table_and_check_compression(cql, keyspace, table_name, expected_compression, context):
"""Helper to create a table and verify its compression algorithm."""
logger.info(f"Creating table {table_name} ({context})")
await cql.run_async(f"CREATE TABLE {keyspace}.{table_name} (pk int PRIMARY KEY, v int)")
logger.info(f"Verifying that the default compression algorithm is {expected_compression}")
result = await cql.run_async(f"SELECT compression FROM system_schema.tables WHERE keyspace_name = '{keyspace}' AND table_name = '{table_name}'")
actual_compression = result[0].compression.get("sstable_compression")
logger.info(f"Actual compression for {table_name}: {actual_compression}")
assert actual_compression == expected_compression, \
f"Expected {expected_compression} for {table_name} ({context}), got: {actual_compression}"
new_exe = os.getenv("SCYLLA")
assert new_exe
logger.info("Starting servers with version 2025.1")
servers = await manager.servers_add(2, version=scylla_2025_1)
logger.info("Creating a test keyspace")
cql = manager.get_cql()
await cql.run_async("CREATE KEYSPACE test_ks WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}")
await create_table_and_check_compression(cql, "test_ks", "table_before_upgrade", "org.apache.cassandra.io.compress.LZ4Compressor", "before upgrade")
logger.info("Upgrading server 0")
await manager.server_change_version(servers[0].server_id, new_exe)
await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
await create_table_and_check_compression(cql, "test_ks", "table_during_upgrade", "org.apache.cassandra.io.compress.LZ4Compressor", "during upgrade")
logger.info("Upgrading server 1")
await manager.server_change_version(servers[1].server_id, new_exe)
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
logger.info("Waiting for SSTABLE_COMPRESSION_DICTS cluster feature to be enabled on all nodes")
await asyncio.gather(*(wait_for_feature("SSTABLE_COMPRESSION_DICTS", cql, host, time.time() + 60) for host in hosts))
await create_table_and_check_compression(cql, "test_ks", "table_after_upgrade", "LZ4WithDictsCompressor", "after upgrade and feature enabled")