diff --git a/api/api-doc/system.json b/api/api-doc/system.json index 4bcbd10734..3582983b54 100644 --- a/api/api-doc/system.json +++ b/api/api-doc/system.json @@ -224,6 +224,24 @@ "parameters":[] } ] + }, + { + "path":"/system/shard_to_numa_node_mapping", + "operations":[ + { + "method":"GET", + "summary":"Get the shard-to-NUMA-node mapping. The nth element is the NUMA node ID of the nth shard.", + "type":"array", + "items":{ + "type":"long" + }, + "nickname":"get_shard_to_numa_node_mapping", + "produces":[ + "application/json" + ], + "parameters":[] + } + ] } ] } diff --git a/api/system.cc b/api/system.cc index d560a8a86d..e40b5869f2 100644 --- a/api/system.cc +++ b/api/system.cc @@ -197,6 +197,11 @@ void set_system(http_context& ctx, routes& r) { return make_ready_future(seastar::to_sstring(format)); }); }); + + hs::get_shard_to_numa_node_mapping.set(r, [](const_req req) { + auto mapping = local_engine->smp().shard_to_numa_node_mapping(); + return std::vector(mapping.begin(), mapping.end()); + }); } } diff --git a/test/cluster/test_sstable_compression_dictionaries_basic.py b/test/cluster/test_sstable_compression_dictionaries_basic.py index 6dc152d28e..e97d29928d 100644 --- a/test/cluster/test_sstable_compression_dictionaries_basic.py +++ b/test/cluster/test_sstable_compression_dictionaries_basic.py @@ -18,8 +18,24 @@ logger = logging.getLogger(__name__) async def get_metrics(manager: ManagerClient, servers: list[ServerInfo]) -> list[ScyllaMetrics]: return await asyncio.gather(*[manager.metrics.query(s.ip_addr) for s in servers]) -def dict_memory(metrics: list[ScyllaMetrics]) -> int: - return sum([m.get("scylla_sstable_compression_dicts_total_live_memory_bytes") for m in metrics]) +def dict_memory(metrics: list[ScyllaMetrics]) -> list[int]: + """Returns memory usage by dicts on each shard in the system""" + metric_name = "scylla_sstable_compression_dicts_total_live_memory_bytes" + results = [] + for m in metrics: + for shard in itertools.count(): + value = m.get(metric_name, {'shard': shard}) + if value is None: + break + results.append(value) + return results + +async def get_shard_total_memory(manager: ManagerClient, server: ServerInfo) -> int: + """Return the total RAM available per shard""" + metrics = (await get_metrics(manager, [server]))[0] + value = metrics.get('scylla_memory_total_memory', {'shard': 0}) + assert value is not None and value > 0 + return int(value) async def live_update_config(manager: ManagerClient, servers: list[ServerInfo], key: str, value: str): cql, hosts = await manager.get_ready_cql(servers) @@ -282,25 +298,42 @@ async def test_dict_memory_limit(manager: ManagerClient): # Bootstrap cluster and configure server logger.info("Bootstrapping cluster") - # Should be enough to fit 3 dicts of size <128 kiB. - # (The default dict size is ~110 kiB) - shard_memory = 1*1024*1024*1024 / 2 - intended_dict_memory_bugdet = 4*128*1024 - mem_fraction = intended_dict_memory_bugdet / shard_memory + intended_dict_memory_bugdet = 512*1024 + # Each ZSTD dict is ~128 kiB of raw dict content + ~32 kiB of decompression context. + # 512 kiB should be enough to fit 3 dicts. target_n_dicts = 3 + # Start with the dict budget effectively disabled; we'll set the real fraction + # below, once we can query the actual per-shard total memory. We can't compute + # the fraction up-front because in debug builds seastar uses the default + # allocator and ignores `--memory`, and we don't want to rely on side knowledge + # of what's the size of the memory pool in that case. servers = (await manager.servers_add(1, cmdline=[ *common_debug_cli_options, - #'--memory=1G', # test.py forces --memory=1G, and it can't be used twice '--smp=2', - f'--sstable-compression-dictionaries-memory-budget-fraction={mem_fraction}', + '--sstable-compression-dictionaries-memory-budget-fraction=1.0', ])) - async def assert_eventually_dict_memory_leq_than(threshold: float): + # Derive the fraction so the budget lands on `intended_dict_memory_bugdet`. + shard_memory = await get_shard_total_memory(manager, servers[0]) + mem_fraction = intended_dict_memory_bugdet / shard_memory + logger.info(f"shard_memory={shard_memory}, mem_fraction={mem_fraction}") + await live_update_config(manager, servers, 'sstable_compression_dictionaries_memory_budget_fraction', str(mem_fraction)) + + # Dictionaries are replicated per NUMA node, so memory usage scales with the + # number of NUMA nodes the shards span. + numa_mapping = await manager.api.get_shard_to_numa_node_mapping(servers[0].ip_addr) + n_numa_nodes = len(set(numa_mapping)) + logger.info(f"shard-to-NUMA mapping: {numa_mapping} ({n_numa_nodes} NUMA node(s))") + + async def assert_eventually_dict_memory_leq_than(*, shard_threshold: float = float('inf'), total_threshold: float = float('inf')): async def test_once(): mem = dict_memory(await get_metrics(manager, servers)) - assert mem <= threshold - return mem + for m in mem: + assert m <= shard_threshold + total_mem = sum(mem) + assert total_mem <= total_threshold + return total_mem # After all sstables holding a given dictionary alive are deleted, # the dictionary should be freed from memory. # But it's only near-instantaneous, not actually instantaneous, @@ -321,26 +354,29 @@ async def test_dict_memory_limit(manager: ManagerClient): ) for algo in ['ZstdWithDictsCompressor', 'LZ4WithDictsCompressor']: - logger.info(f"Creating table with algo {algo}") + logger.info(f"Creating the table") await cql.run_async("CREATE TABLE test.test (pk int PRIMARY KEY, c blob);") logger.info("Disabling autocompaction for the table") await asyncio.gather(*[manager.api.disable_autocompaction(s.ip_addr, ks_name, cf_name) for s in servers]) - logger.info("Altering table to use zstd compression") + logger.info(f"Altering table to use {algo}") await cql.run_async( f"ALTER TABLE test.test WITH COMPRESSION = {{'sstable_compression': '{algo}'}};" ) logger.info("Creating several SSTables, retraining a dict after each") insert = cql.prepare("INSERT INTO test.test (pk, c) VALUES (?, ?);") - for i in range(target_n_dicts + 2): + # Write enough new dicts to exercise the limit + for i in range(target_n_dicts * 3): blob = random.randbytes(96 * 1024) await asyncio.gather(*[cql.run_async(insert, [i, blob]) for i in range(10)]) await asyncio.gather(*[manager.api.keyspace_flush(s.ip_addr, ks_name, cf_name) for s in servers]) await manager.api.retrain_dict(servers[0].ip_addr, "test", "test") - dict_mem = await assert_eventually_dict_memory_leq_than(intended_dict_memory_bugdet * 2) + # There should be at most one dict past the limit + shard_threshold = intended_dict_memory_bugdet * (target_n_dicts + 1) / target_n_dicts + dict_mem = await assert_eventually_dict_memory_leq_than(shard_threshold=shard_threshold) total_size = await get_total_data_size(manager, servers, ks_name, cf_name) logger.info(f"Round 0, step {i}: total_size={total_size}, dictmem={dict_mem}") @@ -352,7 +388,7 @@ async def test_dict_memory_limit(manager: ManagerClient): # One dict should occupy less than 256 kiB. # (That's not a requirement, just a result of how big the dictionaries are # at the moment of this writing. The contant might change one day). - await assert_eventually_dict_memory_leq_than(256*1024 - 1) + await assert_eventually_dict_memory_leq_than(total_threshold=(256*1024 * n_numa_nodes) -1) logger.info("Validating query results") select = cql.prepare("SELECT c FROM test.test WHERE pk = ?;") @@ -362,7 +398,7 @@ async def test_dict_memory_limit(manager: ManagerClient): logger.info("Dropping the table and checking that there are no leftovers") await cql.run_async("DROP TABLE test.test") - await assert_eventually_dict_memory_leq_than(0) + await assert_eventually_dict_memory_leq_than(total_threshold=0) async def test_sstable_compression_dictionaries_enable_writing(manager: ManagerClient): """ diff --git a/test/pylib/rest_client.py b/test/pylib/rest_client.py index bc2c873e4f..81912f3bd0 100644 --- a/test/pylib/rest_client.py +++ b/test/pylib/rest_client.py @@ -466,6 +466,10 @@ class ScyllaRESTAPIClient: """Drop sstable caches""" await self.client.post(f"/system/drop_sstable_caches", host=node_ip) + async def get_shard_to_numa_node_mapping(self, node_ip: str) -> list[int]: + """Return the shard-to-NUMA-node mapping (index = shard, value = NUMA node ID).""" + return await self.client.get_json("/system/shard_to_numa_node_mapping", host=node_ip) + async def keyspace_flush(self, node_ip: str, keyspace: str, table: Optional[str] = None) -> None: """Flush the specified or all tables in the keyspace""" url = f"/storage_service/keyspace_flush/{keyspace}"