The previous patch made the default compressor dependent on the SSTABLE_COMPRESSION_DICTS feature: * LZ4Compressor if the feature is disabled * LZ4WithDictsCompressor if the feature is enabled Add a test to verify that the cluster uses the right default in every case. Signed-off-by: Nikos Dragazis <nikolaos.dragazis@scylladb.com>
148 lines
6.4 KiB
Python
148 lines
6.4 KiB
Python
#
|
|
# Copyright (C) 2025-present ScyllaDB
|
|
#
|
|
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
#
|
|
|
|
import os
|
|
import time
|
|
import pytest
|
|
import asyncio
|
|
import logging
|
|
|
|
from test.pylib.util import wait_for_cql_and_get_hosts, wait_for_feature
|
|
from test.pylib.manager_client import ManagerClient, ScyllaVersionDescription
|
|
|
|
logger = logging.getLogger(__name__)
|
|
logger.setLevel(logging.INFO)
|
|
|
|
|
|
def yaml_to_cmdline(config):
|
|
cmdline = []
|
|
for k, v in config.items():
|
|
if isinstance(v, dict):
|
|
v = ','.join([f'{kk}={vv}' for kk, vv in v.items()])
|
|
cmdline.append(f'--{k.replace("_", "-")}')
|
|
cmdline.append(str(v))
|
|
return cmdline
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.parametrize('cfg_source', ['yaml', 'cmdline'])
|
|
async def test_chunk_size_negative(manager: ManagerClient, cfg_source: str):
|
|
config = {
|
|
'sstable_compression_user_table_options': {
|
|
'sstable_compression': 'LZ4Compressor',
|
|
'chunk_length_in_kb': -1
|
|
}
|
|
}
|
|
expected_error = 'Invalid sstable_compression_user_table_options: Invalid negative or null for chunk_length_in_kb/chunk_length_kb'
|
|
if cfg_source == 'yaml':
|
|
await manager.server_add(config=config, expected_error=expected_error)
|
|
else:
|
|
await manager.server_add(cmdline=yaml_to_cmdline(config), expected_error=expected_error)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.parametrize('cfg_source', ['yaml', 'cmdline'])
|
|
async def test_chunk_size_beyond_max(manager: ManagerClient, cfg_source: str):
|
|
config = {
|
|
'sstable_compression_user_table_options': {
|
|
'sstable_compression': 'LZ4Compressor',
|
|
'chunk_length_in_kb': 256
|
|
}
|
|
}
|
|
expected_error = 'Invalid sstable_compression_user_table_options: chunk_length_in_kb/chunk_length_kb must be 128 or less.'
|
|
if cfg_source == 'yaml':
|
|
await manager.server_add(config=config, expected_error=expected_error)
|
|
else:
|
|
await manager.server_add(cmdline=yaml_to_cmdline(config), expected_error=expected_error)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.parametrize('cfg_source', ['yaml', 'cmdline'])
|
|
async def test_chunk_size_not_power_of_two(manager: ManagerClient, cfg_source: str):
|
|
config = {
|
|
'sstable_compression_user_table_options': {
|
|
'sstable_compression': 'LZ4Compressor',
|
|
'chunk_length_in_kb': 3
|
|
}
|
|
}
|
|
expected_error = 'Invalid sstable_compression_user_table_options: chunk_length_in_kb/chunk_length_kb must be a power of 2.'
|
|
|
|
if cfg_source == 'yaml':
|
|
await manager.server_add(config=config, expected_error=expected_error)
|
|
else:
|
|
await manager.server_add(cmdline=yaml_to_cmdline(config), expected_error=expected_error)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.parametrize('cfg_source', ['yaml', 'cmdline'])
|
|
async def test_crc_check_chance_out_of_bounds(manager: ManagerClient, cfg_source: str):
|
|
config = {
|
|
'sstable_compression_user_table_options': {
|
|
'sstable_compression': 'LZ4Compressor',
|
|
'chunk_length_in_kb': 128,
|
|
'crc_check_chance': 1.1
|
|
}
|
|
}
|
|
expected_error = 'Invalid sstable_compression_user_table_options: crc_check_chance must be between 0.0 and 1.0.'
|
|
if cfg_source == 'yaml':
|
|
await manager.server_add(config=config, expected_error=expected_error)
|
|
else:
|
|
await manager.server_add(cmdline=yaml_to_cmdline(config), expected_error=expected_error)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_default_compression_on_upgrade(manager: ManagerClient, scylla_2025_1: ScyllaVersionDescription):
|
|
"""
|
|
Check that the default SSTable compression algorithm is:
|
|
* LZ4Compressor if SSTABLE_COMPRESSION_DICTS is disabled.
|
|
* LZ4WithDictsCompressor if SSTABLE_COMPRESSION_DICTS is enabled.
|
|
|
|
- Start a 2-node cluster running a version where dictionary compression is not supported (2025.1).
|
|
- Create a table. Ensure that it uses the LZ4Compressor.
|
|
- Upgrade one node.
|
|
- Create a second table. Ensure that it still uses the LZ4Compressor.
|
|
- Upgrade the second node.
|
|
- Wait for SSTABLE_COMPRESSION_DICTS to be enabled.
|
|
- Create a third table. Ensure that it uses the new LZ4WithDictsCompressor.
|
|
"""
|
|
async def create_table_and_check_compression(cql, keyspace, table_name, expected_compression, context):
|
|
"""Helper to create a table and verify its compression algorithm."""
|
|
logger.info(f"Creating table {table_name} ({context})")
|
|
await cql.run_async(f"CREATE TABLE {keyspace}.{table_name} (pk int PRIMARY KEY, v int)")
|
|
|
|
logger.info(f"Verifying that the default compression algorithm is {expected_compression}")
|
|
result = await cql.run_async(f"SELECT compression FROM system_schema.tables WHERE keyspace_name = '{keyspace}' AND table_name = '{table_name}'")
|
|
actual_compression = result[0].compression.get("sstable_compression")
|
|
logger.info(f"Actual compression for {table_name}: {actual_compression}")
|
|
|
|
assert actual_compression == expected_compression, \
|
|
f"Expected {expected_compression} for {table_name} ({context}), got: {actual_compression}"
|
|
|
|
new_exe = os.getenv("SCYLLA")
|
|
assert new_exe
|
|
|
|
logger.info("Starting servers with version 2025.1")
|
|
servers = await manager.servers_add(2, version=scylla_2025_1)
|
|
|
|
logger.info("Creating a test keyspace")
|
|
cql = manager.get_cql()
|
|
await cql.run_async("CREATE KEYSPACE test_ks WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}")
|
|
|
|
await create_table_and_check_compression(cql, "test_ks", "table_before_upgrade", "org.apache.cassandra.io.compress.LZ4Compressor", "before upgrade")
|
|
|
|
logger.info("Upgrading server 0")
|
|
await manager.server_change_version(servers[0].server_id, new_exe)
|
|
await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
|
|
|
await create_table_and_check_compression(cql, "test_ks", "table_during_upgrade", "org.apache.cassandra.io.compress.LZ4Compressor", "during upgrade")
|
|
|
|
logger.info("Upgrading server 1")
|
|
await manager.server_change_version(servers[1].server_id, new_exe)
|
|
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
|
|
|
logger.info("Waiting for SSTABLE_COMPRESSION_DICTS cluster feature to be enabled on all nodes")
|
|
await asyncio.gather(*(wait_for_feature("SSTABLE_COMPRESSION_DICTS", cql, host, time.time() + 60) for host in hosts))
|
|
|
|
await create_table_and_check_compression(cql, "test_ks", "table_after_upgrade", "LZ4WithDictsCompressor", "after upgrade and feature enabled") |