Files
scylladb/test/cluster/test_sstable_compression_config.py
Andrei Chekun 99234f0a83 test.py: replace SCYLLA env var with build_mode fixture
Replace direct usage of SCYLLA environment variable with the build_mode
pytest fixture and path_to helper function. This makes tests more
flexible and consistent with the test framework. Also this allows to use
tests with xdist, where environment variable can be left in the master
process and will not be set in the workers
Add using the fixture to get the scylla binary from the suite, this will
align with getting relocatable Scylla exe.
2026-02-24 09:48:38 +01:00

323 lines
14 KiB
Python

#
# Copyright (C) 2025-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
import os
import time
from pathlib import Path
import pytest
import asyncio
import logging
from test.pylib.util import wait_for_cql_and_get_hosts, wait_for_feature
from test.pylib.manager_client import ManagerClient, ScyllaVersionDescription
from test.cluster.test_alternator import alternator_config, get_alternator
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
def yaml_to_cmdline(config):
cmdline = []
for k, v in config.items():
if isinstance(v, dict):
v = ','.join([f'{kk}={vv}' for kk, vv in v.items()])
cmdline.append(f'--{k.replace("_", "-")}')
cmdline.append(str(v))
return cmdline
@pytest.mark.asyncio
@pytest.mark.parametrize('cfg_source', ['yaml', 'cmdline'])
async def test_chunk_size_negative(manager: ManagerClient, cfg_source: str):
config = {
'sstable_compression_user_table_options': {
'sstable_compression': 'LZ4Compressor',
'chunk_length_in_kb': -1
}
}
expected_error = 'Invalid sstable_compression_user_table_options: Invalid negative or null for chunk_length_in_kb/chunk_length_kb'
if cfg_source == 'yaml':
await manager.server_add(config=config, expected_error=expected_error)
else:
await manager.server_add(cmdline=yaml_to_cmdline(config), expected_error=expected_error)
@pytest.mark.asyncio
@pytest.mark.parametrize('cfg_source', ['yaml', 'cmdline'])
async def test_chunk_size_beyond_max(manager: ManagerClient, cfg_source: str):
config = {
'sstable_compression_user_table_options': {
'sstable_compression': 'LZ4Compressor',
'chunk_length_in_kb': 256
}
}
expected_error = 'Invalid sstable_compression_user_table_options: chunk_length_in_kb/chunk_length_kb must be 128 or less.'
if cfg_source == 'yaml':
await manager.server_add(config=config, expected_error=expected_error)
else:
await manager.server_add(cmdline=yaml_to_cmdline(config), expected_error=expected_error)
@pytest.mark.asyncio
@pytest.mark.parametrize('cfg_source', ['yaml', 'cmdline'])
async def test_chunk_size_not_power_of_two(manager: ManagerClient, cfg_source: str):
config = {
'sstable_compression_user_table_options': {
'sstable_compression': 'LZ4Compressor',
'chunk_length_in_kb': 3
}
}
expected_error = 'Invalid sstable_compression_user_table_options: chunk_length_in_kb/chunk_length_kb must be a power of 2.'
if cfg_source == 'yaml':
await manager.server_add(config=config, expected_error=expected_error)
else:
await manager.server_add(cmdline=yaml_to_cmdline(config), expected_error=expected_error)
@pytest.mark.asyncio
@pytest.mark.parametrize('cfg_source', ['yaml', 'cmdline'])
async def test_crc_check_chance_out_of_bounds(manager: ManagerClient, cfg_source: str):
config = {
'sstable_compression_user_table_options': {
'sstable_compression': 'LZ4Compressor',
'chunk_length_in_kb': 128,
'crc_check_chance': 1.1
}
}
expected_error = 'Invalid sstable_compression_user_table_options: crc_check_chance must be between 0.0 and 1.0.'
if cfg_source == 'yaml':
await manager.server_add(config=config, expected_error=expected_error)
else:
await manager.server_add(cmdline=yaml_to_cmdline(config), expected_error=expected_error)
@pytest.mark.asyncio
async def test_default_compression_on_upgrade(manager: ManagerClient, scylla_2025_1: ScyllaVersionDescription, scylla_binary: Path):
"""
Check that the default SSTable compression algorithm is:
* LZ4Compressor if SSTABLE_COMPRESSION_DICTS is disabled.
* LZ4WithDictsCompressor if SSTABLE_COMPRESSION_DICTS is enabled.
- Start a 2-node cluster running a version where dictionary compression is not supported (2025.1).
- Create a table. Ensure that it uses the LZ4Compressor.
- Upgrade one node.
- Create a second table. Ensure that it still uses the LZ4Compressor.
- Upgrade the second node.
- Wait for SSTABLE_COMPRESSION_DICTS to be enabled.
- Create a third table. Ensure that it uses the new LZ4WithDictsCompressor.
"""
async def create_table_and_check_compression(cql, keyspace, table_name, expected_compression, context):
"""Helper to create a table and verify its compression algorithm."""
logger.info(f"Creating table {table_name} ({context})")
await cql.run_async(f"CREATE TABLE {keyspace}.{table_name} (pk int PRIMARY KEY, v int)")
logger.info(f"Verifying that the default compression algorithm is {expected_compression}")
result = await cql.run_async(f"SELECT compression FROM system_schema.tables WHERE keyspace_name = '{keyspace}' AND table_name = '{table_name}'")
actual_compression = result[0].compression.get("sstable_compression")
logger.info(f"Actual compression for {table_name}: {actual_compression}")
assert actual_compression == expected_compression, \
f"Expected {expected_compression} for {table_name} ({context}), got: {actual_compression}"
logger.info("Starting servers with version 2025.1")
servers = await manager.servers_add(2, version=scylla_2025_1)
logger.info("Creating a test keyspace")
cql = manager.get_cql()
await cql.run_async("CREATE KEYSPACE test_ks WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}")
await create_table_and_check_compression(cql, "test_ks", "table_before_upgrade", "org.apache.cassandra.io.compress.LZ4Compressor", "before upgrade")
logger.info("Upgrading server 0")
await manager.server_change_version(servers[0].server_id, scylla_binary)
await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
await create_table_and_check_compression(cql, "test_ks", "table_during_upgrade", "org.apache.cassandra.io.compress.LZ4Compressor", "during upgrade")
logger.info("Upgrading server 1")
await manager.server_change_version(servers[1].server_id, scylla_binary)
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
logger.info("Waiting for SSTABLE_COMPRESSION_DICTS cluster feature to be enabled on all nodes")
await asyncio.gather(*(wait_for_feature("SSTABLE_COMPRESSION_DICTS", cql, host, time.time() + 60) for host in hosts))
await create_table_and_check_compression(cql, "test_ks", "table_after_upgrade", "LZ4WithDictsCompressor", "after upgrade and feature enabled")
@pytest.mark.asyncio
async def test_alternator_tables_respect_compression_config(manager: ManagerClient):
"""
Check that the default compression settings for all Alternator tables (base
tables and auxiliary tables for GSIs, LSIs and Streams) are taken from the
`sstable_compression_user_table_options` config option.
A reproducer for #26914 - all Alternator tables have their default compression
algorithm hardcoded to LZ4Compressor.
"""
# Start Scylla with custom compression options
compression_config = {
'sstable_compression_user_table_options': {
'sstable_compression': 'ZstdCompressor',
'chunk_length_in_kb': 64,
}
}
server = await manager.server_add(config=compression_config | alternator_config)
# Connect to Alternator and CQL
alternator = get_alternator(server.ip_addr)
cql = manager.get_cql()
await wait_for_cql_and_get_hosts(cql, [server], time.time() + 60)
# Create a table with GSI, LSI and Streams
table = alternator.create_table(TableName='base',
BillingMode='PAY_PER_REQUEST',
Tags=[{'Key': 'system:initial_tablets', 'Value': 'none'}],
KeySchema=[
{ 'AttributeName': 'p', 'KeyType': 'HASH' },
{ 'AttributeName': 'c', 'KeyType': 'RANGE' }
],
AttributeDefinitions=[
{ 'AttributeName': 'p', 'AttributeType': 'S' },
{ 'AttributeName': 'c', 'AttributeType': 'S' },
{ 'AttributeName': 'x', 'AttributeType': 'S' },
],
LocalSecondaryIndexes=[
{ 'IndexName': 'lsi_name',
'KeySchema': [
{ 'AttributeName': 'p', 'KeyType': 'HASH' },
{ 'AttributeName': 'x', 'KeyType': 'RANGE' },
],
'Projection': { 'ProjectionType': 'ALL' }
}
],
GlobalSecondaryIndexes=[
{ 'IndexName': 'gsi_name',
'KeySchema': [{ 'AttributeName': 'x', 'KeyType': 'HASH' }],
'Projection': { 'ProjectionType': 'ALL' }
}
],
StreamSpecification={
'StreamEnabled': True, 'StreamViewType': 'KEYS_ONLY'
}
)
try:
def check_compression(result):
assert len(result) == 1
compression = result[0].compression
assert compression.get("sstable_compression") == "org.apache.cassandra.io.compress.ZstdCompressor"
assert int(compression.get("chunk_length_in_kb", 0)) == 64
ks = f"alternator_{table.name}"
base_table = table.name
gsi_table = f"{table.name}:gsi_name"
lsi_table = f"{table.name}!:lsi_name"
cdc_table = f"{table.name}_scylla_cdc_log"
# Base table
result = await cql.run_async(f"SELECT compression FROM system_schema.tables WHERE keyspace_name = '{ks}' AND table_name = '{base_table}'")
check_compression(result)
# GSI table
result = await cql.run_async(f"SELECT compression FROM system_schema.views WHERE keyspace_name = '{ks}' AND view_name = '{gsi_table}'")
check_compression(result)
# LSI table
result = await cql.run_async(f"SELECT compression FROM system_schema.views WHERE keyspace_name = '{ks}' AND view_name = '{lsi_table}'")
check_compression(result)
# CDC log table
result = await cql.run_async(f"SELECT compression FROM system_schema.tables WHERE keyspace_name = '{ks}' AND table_name = '{cdc_table}'")
check_compression(result)
finally:
table.delete()
@pytest.mark.asyncio
async def test_cql_base_tables_respect_compression_config(manager: ManagerClient):
"""
Check that the default compression settings for CQL base tables are taken
from the `sstable_compression_user_table_options` config option.
"""
compression_config = {
'sstable_compression_user_table_options': {
'sstable_compression': 'ZstdCompressor',
'chunk_length_in_kb': 64,
},
}
server = await manager.server_add(config=compression_config)
cql = manager.get_cql()
await wait_for_cql_and_get_hosts(cql, [server], time.time() + 60)
ks = f"cql_aux_test_{int(time.time())}"
await cql.run_async(f"CREATE KEYSPACE {ks} WITH replication = {{'class': 'SimpleStrategy', 'replication_factor': 1}}")
await cql.run_async(f"CREATE TABLE {ks}.base (pk int PRIMARY KEY, v int)")
try:
result = await cql.run_async(f"SELECT compression FROM system_schema.tables WHERE keyspace_name = '{ks}' AND table_name = 'base'")
assert len(result) == 1
compression = result[0].compression
assert compression.get("sstable_compression") == f"org.apache.cassandra.io.compress.ZstdCompressor"
assert int(compression.get("chunk_length_in_kb", 0)) == 64
finally:
await cql.run_async(f"DROP KEYSPACE {ks}")
@pytest.mark.asyncio
async def test_cql_aux_tables_respect_compression_config(manager: ManagerClient):
"""
Check that the default compression settings for CQL auxiliary tables
(materialized views, secondary indexes and CDC logs) are taken from the
`sstable_compression_user_table_options` config option.
To prove that auxiliary tables don't inherit compression from the base table,
we use a base table compressor that is different from the one set in
`sstable_compression_user_table_options`.
A reproducer for #26914 - all auxiliary tables have their default compression
algorithm hardcoded to LZ4Compressor.
TODO: Replace this test with a config-agnostic cqlpy test if aux tables are
modified to inherit compression from the base table (issue #20388).
"""
compression_config = {
'sstable_compression_user_table_options': {
'sstable_compression': 'ZstdCompressor',
'chunk_length_in_kb': 64,
},
}
server = await manager.server_add(config=compression_config)
cql = manager.get_cql()
await wait_for_cql_and_get_hosts(cql, [server], time.time() + 60)
ks = f"cql_aux_test_{int(time.time())}"
await cql.run_async(f"CREATE KEYSPACE {ks} WITH replication = {{'class': 'SimpleStrategy', 'replication_factor': 1}}")
await cql.run_async(f"CREATE TABLE {ks}.base (pk int PRIMARY KEY, v int) WITH compression = {{ 'sstable_compression': 'DeflateCompressor' }}")
await cql.run_async(f"CREATE MATERIALIZED VIEW {ks}.mv AS SELECT * FROM {ks}.base WHERE pk IS NOT NULL AND v IS NOT NULL PRIMARY KEY (v, pk)")
await cql.run_async(f"CREATE INDEX ON {ks}.base(v)")
await cql.run_async(f"ALTER TABLE {ks}.base WITH cdc = {{'enabled': true}}")
try:
def check_compression(result):
assert len(result) == 1
compression = result[0].compression
assert compression.get("sstable_compression") == f"org.apache.cassandra.io.compress.ZstdCompressor"
assert int(compression.get("chunk_length_in_kb", 0)) == 64
# Materialized view
result = await cql.run_async(f"SELECT compression FROM system_schema.views WHERE keyspace_name = '{ks}' AND view_name = 'mv'")
check_compression(result)
# Secondary index
result = await cql.run_async(f"SELECT compression FROM system_schema.views WHERE keyspace_name = '{ks}' AND view_name = 'base_v_idx_index'")
check_compression(result)
# CDC log table
result = await cql.run_async(f"SELECT compression FROM system_schema.tables WHERE keyspace_name = '{ks}' AND table_name = 'base_scylla_cdc_log'")
check_compression(result)
finally:
await cql.run_async(f"DROP KEYSPACE {ks}")