Merge 'test_sstable_compression_dictionaries: fix the test when running on more than one NUMA node' from Michał Chojnowski

Dictionaries are replicated per NUMA node, so memory usage scales with the number of NUMA nodes spanned by the instance. The thresholds used in the test were written with the assumption that there's a single copy of each dict.
Fix that.

Fixes SCYLLADB-2320

Should be backported to releases since 2025.2, in case someone wants to run the CI of those on NUMA machines.

Closes scylladb/scylladb#30178

* github.com:scylladb/scylladb:
  test_sstable_compression_dictionaries: fix the test when running on more than one NUMA node
  test/pylib: add get_shard_to_numa_node_mapping()
  api: add system/shard_to_numa_node_mapping
This commit is contained in:
Avi Kivity
2026-06-03 11:29:51 +03:00
4 changed files with 81 additions and 18 deletions

View File

@@ -224,6 +224,24 @@
"parameters":[]
}
]
},
{
"path":"/system/shard_to_numa_node_mapping",
"operations":[
{
"method":"GET",
"summary":"Get the shard-to-NUMA-node mapping. The nth element is the NUMA node ID of the nth shard.",
"type":"array",
"items":{
"type":"long"
},
"nickname":"get_shard_to_numa_node_mapping",
"produces":[
"application/json"
],
"parameters":[]
}
]
}
]
}

View File

@@ -197,6 +197,11 @@ void set_system(http_context& ctx, routes& r) {
return make_ready_future<json::json_return_type>(seastar::to_sstring(format));
});
});
hs::get_shard_to_numa_node_mapping.set(r, [](const_req req) {
auto mapping = local_engine->smp().shard_to_numa_node_mapping();
return std::vector<unsigned>(mapping.begin(), mapping.end());
});
}
}

View File

@@ -18,8 +18,24 @@ logger = logging.getLogger(__name__)
async def get_metrics(manager: ManagerClient, servers: list[ServerInfo]) -> list[ScyllaMetrics]:
return await asyncio.gather(*[manager.metrics.query(s.ip_addr) for s in servers])
def dict_memory(metrics: list[ScyllaMetrics]) -> int:
return sum([m.get("scylla_sstable_compression_dicts_total_live_memory_bytes") for m in metrics])
def dict_memory(metrics: list[ScyllaMetrics]) -> list[int]:
"""Returns memory usage by dicts on each shard in the system"""
metric_name = "scylla_sstable_compression_dicts_total_live_memory_bytes"
results = []
for m in metrics:
for shard in itertools.count():
value = m.get(metric_name, {'shard': shard})
if value is None:
break
results.append(value)
return results
async def get_shard_total_memory(manager: ManagerClient, server: ServerInfo) -> int:
"""Return the total RAM available per shard"""
metrics = (await get_metrics(manager, [server]))[0]
value = metrics.get('scylla_memory_total_memory', {'shard': 0})
assert value is not None and value > 0
return int(value)
async def live_update_config(manager: ManagerClient, servers: list[ServerInfo], key: str, value: str):
cql, hosts = await manager.get_ready_cql(servers)
@@ -282,25 +298,42 @@ async def test_dict_memory_limit(manager: ManagerClient):
# Bootstrap cluster and configure server
logger.info("Bootstrapping cluster")
# Should be enough to fit 3 dicts of size <128 kiB.
# (The default dict size is ~110 kiB)
shard_memory = 1*1024*1024*1024 / 2
intended_dict_memory_bugdet = 4*128*1024
mem_fraction = intended_dict_memory_bugdet / shard_memory
intended_dict_memory_bugdet = 512*1024
# Each ZSTD dict is ~128 kiB of raw dict content + ~32 kiB of decompression context.
# 512 kiB should be enough to fit 3 dicts.
target_n_dicts = 3
# Start with the dict budget effectively disabled; we'll set the real fraction
# below, once we can query the actual per-shard total memory. We can't compute
# the fraction up-front because in debug builds seastar uses the default
# allocator and ignores `--memory`, and we don't want to rely on side knowledge
# of what's the size of the memory pool in that case.
servers = (await manager.servers_add(1, cmdline=[
*common_debug_cli_options,
#'--memory=1G', # test.py forces --memory=1G, and it can't be used twice
'--smp=2',
f'--sstable-compression-dictionaries-memory-budget-fraction={mem_fraction}',
'--sstable-compression-dictionaries-memory-budget-fraction=1.0',
]))
async def assert_eventually_dict_memory_leq_than(threshold: float):
# Derive the fraction so the budget lands on `intended_dict_memory_bugdet`.
shard_memory = await get_shard_total_memory(manager, servers[0])
mem_fraction = intended_dict_memory_bugdet / shard_memory
logger.info(f"shard_memory={shard_memory}, mem_fraction={mem_fraction}")
await live_update_config(manager, servers, 'sstable_compression_dictionaries_memory_budget_fraction', str(mem_fraction))
# Dictionaries are replicated per NUMA node, so memory usage scales with the
# number of NUMA nodes the shards span.
numa_mapping = await manager.api.get_shard_to_numa_node_mapping(servers[0].ip_addr)
n_numa_nodes = len(set(numa_mapping))
logger.info(f"shard-to-NUMA mapping: {numa_mapping} ({n_numa_nodes} NUMA node(s))")
async def assert_eventually_dict_memory_leq_than(*, shard_threshold: float = float('inf'), total_threshold: float = float('inf')):
async def test_once():
mem = dict_memory(await get_metrics(manager, servers))
assert mem <= threshold
return mem
for m in mem:
assert m <= shard_threshold
total_mem = sum(mem)
assert total_mem <= total_threshold
return total_mem
# After all sstables holding a given dictionary alive are deleted,
# the dictionary should be freed from memory.
# But it's only near-instantaneous, not actually instantaneous,
@@ -321,26 +354,29 @@ async def test_dict_memory_limit(manager: ManagerClient):
)
for algo in ['ZstdWithDictsCompressor', 'LZ4WithDictsCompressor']:
logger.info(f"Creating table with algo {algo}")
logger.info(f"Creating the table")
await cql.run_async("CREATE TABLE test.test (pk int PRIMARY KEY, c blob);")
logger.info("Disabling autocompaction for the table")
await asyncio.gather(*[manager.api.disable_autocompaction(s.ip_addr, ks_name, cf_name) for s in servers])
logger.info("Altering table to use zstd compression")
logger.info(f"Altering table to use {algo}")
await cql.run_async(
f"ALTER TABLE test.test WITH COMPRESSION = {{'sstable_compression': '{algo}'}};"
)
logger.info("Creating several SSTables, retraining a dict after each")
insert = cql.prepare("INSERT INTO test.test (pk, c) VALUES (?, ?);")
for i in range(target_n_dicts + 2):
# Write enough new dicts to exercise the limit
for i in range(target_n_dicts * 3):
blob = random.randbytes(96 * 1024)
await asyncio.gather(*[cql.run_async(insert, [i, blob]) for i in range(10)])
await asyncio.gather(*[manager.api.keyspace_flush(s.ip_addr, ks_name, cf_name) for s in servers])
await manager.api.retrain_dict(servers[0].ip_addr, "test", "test")
dict_mem = await assert_eventually_dict_memory_leq_than(intended_dict_memory_bugdet * 2)
# There should be at most one dict past the limit
shard_threshold = intended_dict_memory_bugdet * (target_n_dicts + 1) / target_n_dicts
dict_mem = await assert_eventually_dict_memory_leq_than(shard_threshold=shard_threshold)
total_size = await get_total_data_size(manager, servers, ks_name, cf_name)
logger.info(f"Round 0, step {i}: total_size={total_size}, dictmem={dict_mem}")
@@ -352,7 +388,7 @@ async def test_dict_memory_limit(manager: ManagerClient):
# One dict should occupy less than 256 kiB.
# (That's not a requirement, just a result of how big the dictionaries are
# at the moment of this writing. The contant might change one day).
await assert_eventually_dict_memory_leq_than(256*1024 - 1)
await assert_eventually_dict_memory_leq_than(total_threshold=(256*1024 * n_numa_nodes) -1)
logger.info("Validating query results")
select = cql.prepare("SELECT c FROM test.test WHERE pk = ?;")
@@ -362,7 +398,7 @@ async def test_dict_memory_limit(manager: ManagerClient):
logger.info("Dropping the table and checking that there are no leftovers")
await cql.run_async("DROP TABLE test.test")
await assert_eventually_dict_memory_leq_than(0)
await assert_eventually_dict_memory_leq_than(total_threshold=0)
async def test_sstable_compression_dictionaries_enable_writing(manager: ManagerClient):
"""

View File

@@ -466,6 +466,10 @@ class ScyllaRESTAPIClient:
"""Drop sstable caches"""
await self.client.post(f"/system/drop_sstable_caches", host=node_ip)
async def get_shard_to_numa_node_mapping(self, node_ip: str) -> list[int]:
"""Return the shard-to-NUMA-node mapping (index = shard, value = NUMA node ID)."""
return await self.client.get_json("/system/shard_to_numa_node_mapping", host=node_ip)
async def keyspace_flush(self, node_ip: str, keyspace: str, table: Optional[str] = None) -> None:
"""Flush the specified or all tables in the keyspace"""
url = f"/storage_service/keyspace_flush/{keyspace}"