test_sstable_compression_dictionaries: fix the test when running on more than one NUMA node

Dictionaries are replicated per NUMA node,
so memory usage scales with the number of NUMA nodes spanned by the instance.
The thresholds used in the test were written with the assumption that there's a
single copy of each dict.
For multi-node instances, the thresholds should be scaled accordingly.
This commit is contained in:
Michał Chojnowski
2026-05-29 12:35:42 +02:00
parent b5461ce66c
commit 33ca8fa4a9

View File

@@ -18,8 +18,24 @@ logger = logging.getLogger(__name__)
async def get_metrics(manager: ManagerClient, servers: list[ServerInfo]) -> list[ScyllaMetrics]:
return await asyncio.gather(*[manager.metrics.query(s.ip_addr) for s in servers])
def dict_memory(metrics: list[ScyllaMetrics]) -> int:
return sum([m.get("scylla_sstable_compression_dicts_total_live_memory_bytes") for m in metrics])
def dict_memory(metrics: list[ScyllaMetrics]) -> list[int]:
"""Returns memory usage by dicts on each shard in the system"""
metric_name = "scylla_sstable_compression_dicts_total_live_memory_bytes"
results = []
for m in metrics:
for shard in itertools.count():
value = m.get(metric_name, {'shard': shard})
if value is None:
break
results.append(value)
return results
async def get_shard_total_memory(manager: ManagerClient, server: ServerInfo) -> int:
"""Return the total RAM available per shard"""
metrics = (await get_metrics(manager, [server]))[0]
value = metrics.get('scylla_memory_total_memory', {'shard': 0})
assert value is not None and value > 0
return int(value)
async def live_update_config(manager: ManagerClient, servers: list[ServerInfo], key: str, value: str):
cql, hosts = await manager.get_ready_cql(servers)
@@ -282,25 +298,42 @@ async def test_dict_memory_limit(manager: ManagerClient):
# Bootstrap cluster and configure server
logger.info("Bootstrapping cluster")
# Should be enough to fit 3 dicts of size <128 kiB.
# (The default dict size is ~110 kiB)
shard_memory = 1*1024*1024*1024 / 2
intended_dict_memory_bugdet = 4*128*1024
mem_fraction = intended_dict_memory_bugdet / shard_memory
intended_dict_memory_bugdet = 512*1024
# Each ZSTD dict is ~128 kiB of raw dict content + ~32 kiB of decompression context.
# 512 kiB should be enough to fit 3 dicts.
target_n_dicts = 3
# Start with the dict budget effectively disabled; we'll set the real fraction
# below, once we can query the actual per-shard total memory. We can't compute
# the fraction up-front because in debug builds seastar uses the default
# allocator and ignores `--memory`, and we don't want to rely on side knowledge
# of what's the size of the memory pool in that case.
servers = (await manager.servers_add(1, cmdline=[
*common_debug_cli_options,
#'--memory=1G', # test.py forces --memory=1G, and it can't be used twice
'--smp=2',
f'--sstable-compression-dictionaries-memory-budget-fraction={mem_fraction}',
'--sstable-compression-dictionaries-memory-budget-fraction=1.0',
]))
async def assert_eventually_dict_memory_leq_than(threshold: float):
# Derive the fraction so the budget lands on `intended_dict_memory_bugdet`.
shard_memory = await get_shard_total_memory(manager, servers[0])
mem_fraction = intended_dict_memory_bugdet / shard_memory
logger.info(f"shard_memory={shard_memory}, mem_fraction={mem_fraction}")
await live_update_config(manager, servers, 'sstable_compression_dictionaries_memory_budget_fraction', str(mem_fraction))
# Dictionaries are replicated per NUMA node, so memory usage scales with the
# number of NUMA nodes the shards span.
numa_mapping = await manager.api.get_shard_to_numa_node_mapping(servers[0].ip_addr)
n_numa_nodes = len(set(numa_mapping))
logger.info(f"shard-to-NUMA mapping: {numa_mapping} ({n_numa_nodes} NUMA node(s))")
async def assert_eventually_dict_memory_leq_than(*, shard_threshold: float = float('inf'), total_threshold: float = float('inf')):
async def test_once():
mem = dict_memory(await get_metrics(manager, servers))
assert mem <= threshold
return mem
for m in mem:
assert m <= shard_threshold
total_mem = sum(mem)
assert total_mem <= total_threshold
return total_mem
# After all sstables holding a given dictionary alive are deleted,
# the dictionary should be freed from memory.
# But it's only near-instantaneous, not actually instantaneous,
@@ -321,26 +354,29 @@ async def test_dict_memory_limit(manager: ManagerClient):
)
for algo in ['ZstdWithDictsCompressor', 'LZ4WithDictsCompressor']:
logger.info(f"Creating table with algo {algo}")
logger.info(f"Creating the table")
await cql.run_async("CREATE TABLE test.test (pk int PRIMARY KEY, c blob);")
logger.info("Disabling autocompaction for the table")
await asyncio.gather(*[manager.api.disable_autocompaction(s.ip_addr, ks_name, cf_name) for s in servers])
logger.info("Altering table to use zstd compression")
logger.info(f"Altering table to use {algo}")
await cql.run_async(
f"ALTER TABLE test.test WITH COMPRESSION = {{'sstable_compression': '{algo}'}};"
)
logger.info("Creating several SSTables, retraining a dict after each")
insert = cql.prepare("INSERT INTO test.test (pk, c) VALUES (?, ?);")
for i in range(target_n_dicts + 2):
# Write enough new dicts to exercise the limit
for i in range(target_n_dicts * 3):
blob = random.randbytes(96 * 1024)
await asyncio.gather(*[cql.run_async(insert, [i, blob]) for i in range(10)])
await asyncio.gather(*[manager.api.keyspace_flush(s.ip_addr, ks_name, cf_name) for s in servers])
await manager.api.retrain_dict(servers[0].ip_addr, "test", "test")
dict_mem = await assert_eventually_dict_memory_leq_than(intended_dict_memory_bugdet * 2)
# There should be at most one dict past the limit
shard_threshold = intended_dict_memory_bugdet * (target_n_dicts + 1) / target_n_dicts
dict_mem = await assert_eventually_dict_memory_leq_than(shard_threshold=shard_threshold)
total_size = await get_total_data_size(manager, servers, ks_name, cf_name)
logger.info(f"Round 0, step {i}: total_size={total_size}, dictmem={dict_mem}")
@@ -352,7 +388,7 @@ async def test_dict_memory_limit(manager: ManagerClient):
# One dict should occupy less than 256 kiB.
# (That's not a requirement, just a result of how big the dictionaries are
# at the moment of this writing. The contant might change one day).
await assert_eventually_dict_memory_leq_than(256*1024 - 1)
await assert_eventually_dict_memory_leq_than(total_threshold=(256*1024 * n_numa_nodes) -1)
logger.info("Validating query results")
select = cql.prepare("SELECT c FROM test.test WHERE pk = ?;")
@@ -362,7 +398,7 @@ async def test_dict_memory_limit(manager: ManagerClient):
logger.info("Dropping the table and checking that there are no leftovers")
await cql.run_async("DROP TABLE test.test")
await assert_eventually_dict_memory_leq_than(0)
await assert_eventually_dict_memory_leq_than(total_threshold=0)
async def test_sstable_compression_dictionaries_enable_writing(manager: ManagerClient):
"""