From 33ca8fa4a94dcdb9b21d20a73a2366fafa340c7a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Chojnowski?= Date: Fri, 29 May 2026 12:35:42 +0200 Subject: [PATCH] test_sstable_compression_dictionaries: fix the test when running on more than one NUMA node Dictionaries are replicated per NUMA node, so memory usage scales with the number of NUMA nodes spanned by the instance. The thresholds used in the test were written with the assumption that there's a single copy of each dict. For multi-node instances, the thresholds should be scaled accordingly. --- ..._sstable_compression_dictionaries_basic.py | 72 ++++++++++++++----- 1 file changed, 54 insertions(+), 18 deletions(-) diff --git a/test/cluster/test_sstable_compression_dictionaries_basic.py b/test/cluster/test_sstable_compression_dictionaries_basic.py index 6dc152d28e..e97d29928d 100644 --- a/test/cluster/test_sstable_compression_dictionaries_basic.py +++ b/test/cluster/test_sstable_compression_dictionaries_basic.py @@ -18,8 +18,24 @@ logger = logging.getLogger(__name__) async def get_metrics(manager: ManagerClient, servers: list[ServerInfo]) -> list[ScyllaMetrics]: return await asyncio.gather(*[manager.metrics.query(s.ip_addr) for s in servers]) -def dict_memory(metrics: list[ScyllaMetrics]) -> int: - return sum([m.get("scylla_sstable_compression_dicts_total_live_memory_bytes") for m in metrics]) +def dict_memory(metrics: list[ScyllaMetrics]) -> list[int]: + """Returns memory usage by dicts on each shard in the system""" + metric_name = "scylla_sstable_compression_dicts_total_live_memory_bytes" + results = [] + for m in metrics: + for shard in itertools.count(): + value = m.get(metric_name, {'shard': shard}) + if value is None: + break + results.append(value) + return results + +async def get_shard_total_memory(manager: ManagerClient, server: ServerInfo) -> int: + """Return the total RAM available per shard""" + metrics = (await get_metrics(manager, [server]))[0] + value = metrics.get('scylla_memory_total_memory', {'shard': 0}) + assert value is not None and value > 0 + return int(value) async def live_update_config(manager: ManagerClient, servers: list[ServerInfo], key: str, value: str): cql, hosts = await manager.get_ready_cql(servers) @@ -282,25 +298,42 @@ async def test_dict_memory_limit(manager: ManagerClient): # Bootstrap cluster and configure server logger.info("Bootstrapping cluster") - # Should be enough to fit 3 dicts of size <128 kiB. - # (The default dict size is ~110 kiB) - shard_memory = 1*1024*1024*1024 / 2 - intended_dict_memory_bugdet = 4*128*1024 - mem_fraction = intended_dict_memory_bugdet / shard_memory + intended_dict_memory_bugdet = 512*1024 + # Each ZSTD dict is ~128 kiB of raw dict content + ~32 kiB of decompression context. + # 512 kiB should be enough to fit 3 dicts. target_n_dicts = 3 + # Start with the dict budget effectively disabled; we'll set the real fraction + # below, once we can query the actual per-shard total memory. We can't compute + # the fraction up-front because in debug builds seastar uses the default + # allocator and ignores `--memory`, and we don't want to rely on side knowledge + # of what's the size of the memory pool in that case. servers = (await manager.servers_add(1, cmdline=[ *common_debug_cli_options, - #'--memory=1G', # test.py forces --memory=1G, and it can't be used twice '--smp=2', - f'--sstable-compression-dictionaries-memory-budget-fraction={mem_fraction}', + '--sstable-compression-dictionaries-memory-budget-fraction=1.0', ])) - async def assert_eventually_dict_memory_leq_than(threshold: float): + # Derive the fraction so the budget lands on `intended_dict_memory_bugdet`. + shard_memory = await get_shard_total_memory(manager, servers[0]) + mem_fraction = intended_dict_memory_bugdet / shard_memory + logger.info(f"shard_memory={shard_memory}, mem_fraction={mem_fraction}") + await live_update_config(manager, servers, 'sstable_compression_dictionaries_memory_budget_fraction', str(mem_fraction)) + + # Dictionaries are replicated per NUMA node, so memory usage scales with the + # number of NUMA nodes the shards span. + numa_mapping = await manager.api.get_shard_to_numa_node_mapping(servers[0].ip_addr) + n_numa_nodes = len(set(numa_mapping)) + logger.info(f"shard-to-NUMA mapping: {numa_mapping} ({n_numa_nodes} NUMA node(s))") + + async def assert_eventually_dict_memory_leq_than(*, shard_threshold: float = float('inf'), total_threshold: float = float('inf')): async def test_once(): mem = dict_memory(await get_metrics(manager, servers)) - assert mem <= threshold - return mem + for m in mem: + assert m <= shard_threshold + total_mem = sum(mem) + assert total_mem <= total_threshold + return total_mem # After all sstables holding a given dictionary alive are deleted, # the dictionary should be freed from memory. # But it's only near-instantaneous, not actually instantaneous, @@ -321,26 +354,29 @@ async def test_dict_memory_limit(manager: ManagerClient): ) for algo in ['ZstdWithDictsCompressor', 'LZ4WithDictsCompressor']: - logger.info(f"Creating table with algo {algo}") + logger.info(f"Creating the table") await cql.run_async("CREATE TABLE test.test (pk int PRIMARY KEY, c blob);") logger.info("Disabling autocompaction for the table") await asyncio.gather(*[manager.api.disable_autocompaction(s.ip_addr, ks_name, cf_name) for s in servers]) - logger.info("Altering table to use zstd compression") + logger.info(f"Altering table to use {algo}") await cql.run_async( f"ALTER TABLE test.test WITH COMPRESSION = {{'sstable_compression': '{algo}'}};" ) logger.info("Creating several SSTables, retraining a dict after each") insert = cql.prepare("INSERT INTO test.test (pk, c) VALUES (?, ?);") - for i in range(target_n_dicts + 2): + # Write enough new dicts to exercise the limit + for i in range(target_n_dicts * 3): blob = random.randbytes(96 * 1024) await asyncio.gather(*[cql.run_async(insert, [i, blob]) for i in range(10)]) await asyncio.gather(*[manager.api.keyspace_flush(s.ip_addr, ks_name, cf_name) for s in servers]) await manager.api.retrain_dict(servers[0].ip_addr, "test", "test") - dict_mem = await assert_eventually_dict_memory_leq_than(intended_dict_memory_bugdet * 2) + # There should be at most one dict past the limit + shard_threshold = intended_dict_memory_bugdet * (target_n_dicts + 1) / target_n_dicts + dict_mem = await assert_eventually_dict_memory_leq_than(shard_threshold=shard_threshold) total_size = await get_total_data_size(manager, servers, ks_name, cf_name) logger.info(f"Round 0, step {i}: total_size={total_size}, dictmem={dict_mem}") @@ -352,7 +388,7 @@ async def test_dict_memory_limit(manager: ManagerClient): # One dict should occupy less than 256 kiB. # (That's not a requirement, just a result of how big the dictionaries are # at the moment of this writing. The contant might change one day). - await assert_eventually_dict_memory_leq_than(256*1024 - 1) + await assert_eventually_dict_memory_leq_than(total_threshold=(256*1024 * n_numa_nodes) -1) logger.info("Validating query results") select = cql.prepare("SELECT c FROM test.test WHERE pk = ?;") @@ -362,7 +398,7 @@ async def test_dict_memory_limit(manager: ManagerClient): logger.info("Dropping the table and checking that there are no leftovers") await cql.run_async("DROP TABLE test.test") - await assert_eventually_dict_memory_leq_than(0) + await assert_eventually_dict_memory_leq_than(total_threshold=0) async def test_sstable_compression_dictionaries_enable_writing(manager: ManagerClient): """