diff --git a/test/cluster/dtest/ccmlib/scylla_node.py b/test/cluster/dtest/ccmlib/scylla_node.py index caa6744b83..85abc2b47c 100644 --- a/test/cluster/dtest/ccmlib/scylla_node.py +++ b/test/cluster/dtest/ccmlib/scylla_node.py @@ -52,6 +52,18 @@ KNOWN_LOG_LEVELS = { "OFF": "info", } +# Captures the aggregate metric before the "[READ ..., WRITE ...]" block. +STRESS_SUMMARY_PATTERN = re.compile(r'^\s*([\d\.\,]+\d?)\s*\[.*') + +# Extracts the READ metric number inside the "[READ ..., WRITE ...]" block. +STRESS_READ_PATTERN = re.compile(r'.*READ:\s*([\d\.\,]+\d?)[^\d].*') + +# Extracts the WRITE metric number inside the "[READ ..., WRITE ...]" block. +STRESS_WRITE_PATTERN = re.compile(r'.*WRITE:\s*([\d\.\,]+\d?)[^\d].*') + +# Splits a "key : value" line into key and value. +STRESS_KEY_VALUE_PATTERN = re.compile(r'^\s*([^:]+)\s*:\s*(\S.*)\s*$') + class NodeError(Exception): def __init__(self, msg: str, process: int | None = None): @@ -528,6 +540,15 @@ class ScyllaNode: return self.cluster.manager.server_get_workdir(server_id=self.server_id) def stress(self, stress_options: list[str], **kwargs): + """ + Run `cassandra-stress` against this node. + This method does not do any result parsing. + + :param stress_options: List of options to pass to `cassandra-stress`. + :param kwargs: Additional arguments to pass to `subprocess.Popen()`. + :return: Named tuple with `stdout`, `stderr`, and `rc` (return code). + """ + cmd_args = ["cassandra-stress"] + stress_options if not any(opt in cmd_args for opt in ("-d", "-node", "-cloudconf")): @@ -550,21 +571,33 @@ class ScyllaNode: pass - @staticmethod - def _set_stress_val(key, val, res): + def _set_stress_val(self, key, val, res): + """ + Normalize a stress result string and populate aggregate/read/write metrics. + + Removes comma-thousands separators from numbers, converts to float, + stores the aggregate metric under `key`. + If the value contains a "[READ ..., WRITE ...]" block, also stores the + read and write metrics under `key:read` and `key:write`. + + :param key: The metric name + :param val: The metric value string + :param res: The dictionary to populate + """ + def parse_num(s): return float(s.replace(',', '')) if "[" in val: - p = re.compile(r'^\s*([\d\.\,]+\d?)\s*\[.*') + p = STRESS_SUMMARY_PATTERN m = p.match(val) if m: res[key] = parse_num(m.group(1)) - p = re.compile(r'^.*READ:\s*([\d\.\,]+\d?)[^\d].*') + p = STRESS_READ_PATTERN m = p.match(val) if m: res[key + ":read"] = parse_num(m.group(1)) - p = re.compile(r'.*WRITE:\s*([\d\.\,]+\d?)[^\d].*') + p = STRESS_WRITE_PATTERN m = p.match(val) if m: res[key + ":write"] = parse_num(m.group(1)) @@ -573,20 +606,31 @@ class ScyllaNode: res[key] = parse_num(val) except ValueError: res[key] = val + - def stress_object(self, stress_options=None, ignore_errors=None, **kwargs): + """ + Run stress test and return results as a structured metrics dictionary. + + Runs `stress()`, finds the `Results:` section in `stdout`, and then + processes each `key : value` line, putting it into a dictionary. + + :param stress_options: List of stress options to pass to `stress()`. + :param ignore_errors: Deprecated (no effect). + :param kwargs: Additional arguments to pass to `stress()`. + :return: Dictionary of stress test results. + """ if ignore_errors: self.warning("passing `ignore_errors` to stress_object() is deprecated") ret = self.stress(stress_options, **kwargs) - p = re.compile(r'^\s*([^:]+)\s*:\s*(\S.*)\s*$') + p = STRESS_KEY_VALUE_PATTERN res = {} start = False - for line in [s.strip() for s in ret.stdout.splitlines()]: + for line in (s.strip() for s in ret.stdout.splitlines()): if start: m = p.match(line) if m: - ScyllaNode._set_stress_val(m.group(1).strip().lower(), m.group(2).strip(), res) + self._set_stress_val(m.group(1).strip().lower(), m.group(2).strip(), res) else: if line == 'Results:': start = True