# Copyright 2021-present ScyllaDB # # SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 ################################################################## # This file provides a few nodetool-compatible commands that may be useful # for tests. The intent is *not* to test nodetool itself, but just to # use nodetool functionality - e.g., "nodetool flush" when testing CQL. # # When testing against a locally-running Scylla these functions do not # actually use nodetool but rather Scylla's REST API. This simplifies # running the test because an external Java process implementing JMX is # not needed. However, when the REST API port is not available (i.e, when # testing against Cassandra or some remote installation of Scylla) the # external "nodetool" command is used, and must be available in the path or # chosen with the NODETOOL environment variable. import requests import os import subprocess import shutil import pytest import re # For a "cql" object connected to one node, find the REST API URL # with the same node and port 10000. # TODO: We may need to change this function or its callers to add proper # support for testing on multi-node clusters. def rest_api_url(cql): return f'http://{cql.cluster.contact_points[0]}:10000' # Check whether the REST API at port 10000 is available - if not we will # fall back to using an external "nodetool" program. # We only check this once per REST API URL, and cache the decision. checked_rest_api = {} def has_rest_api(cql): url = rest_api_url(cql) if not url in checked_rest_api: # Scylla's REST API does not have an official "ping" command, # so we just list the keyspaces as a (usually) short operation try: # Use a short timeout to give up quickly if the REST API port # is blocked in a way that the request can just hang forever. ok = requests.get(f'{url}/column_family/name/keyspace', timeout=1).ok except: ok = False checked_rest_api[url] = ok return checked_rest_api[url] # Find the external "nodetool" executable (can be overridden by the NODETOOL # environment variable). Only call this if the REST API doesn't work. def nodetool_cmd(): if nodetool_cmd.cmd: return nodetool_cmd.cmd if not nodetool_cmd.failed: nodetool_cmd.conf = os.getenv('NODETOOL') or 'nodetool' nodetool_cmd.cmd = shutil.which(nodetool_cmd.conf) if nodetool_cmd.cmd is None: nodetool_cmd.failed = True if nodetool_cmd.failed: pytest.fail(f"Error: Can't find {nodetool_cmd.conf}. Please set the NODETOOL environment variable to the path of the nodetool utility.", pytrace=False) return nodetool_cmd.cmd nodetool_cmd.cmd = None nodetool_cmd.failed = False nodetool_cmd.conf = False # Run the external "nodetool" executable (can be overridden by the NODETOOL # environment variable). Only call this if the REST API doesn't work. def run_nodetool(cql, *args): # TODO: We may need to change this function or its callers to add proper # support for testing on multi-node clusters. host = cql.cluster.contact_points[0] subprocess.run([nodetool_cmd(), '-h', host, *args]) def flush(cql, table): ks, cf = table.split('.') if has_rest_api(cql): requests.post(f'{rest_api_url(cql)}/storage_service/keyspace_flush/{ks}', params={'cf' : cf}) else: run_nodetool(cql, "flush", ks, cf) def flush_keyspace(cql, ks): if has_rest_api(cql): requests.post(f'{rest_api_url(cql)}/storage_service/keyspace_flush/{ks}') else: run_nodetool(cql, "flush", ks) def flush_all(cql): if has_rest_api(cql): requests.post(f'{rest_api_url(cql)}/storage_service/flush') else: run_nodetool(cql, "flush") def compact(cql, table, flush_memtables=True): ks, cf = table.split('.') if has_rest_api(cql): params = {'cf': cf} if not flush_memtables: params["flush_memtables"] = "false" requests.post(f'{rest_api_url(cql)}/storage_service/keyspace_compaction/{ks}', params=params) else: args = [] if not flush_memtables else ["--flush-memtables", "false"] args.extend([ks, cf]) run_nodetool(cql, "compact", *args) def compact_keyspace(cql, ks, flush_memtables=True): if has_rest_api(cql): params = None if not flush_memtables: params = {"flush_memtables": "false"} requests.post(f'{rest_api_url(cql)}/storage_service/keyspace_compaction/{ks}', params=params) else: args = [] if not flush_memtables else ["--flush-memtables", "false"] args.extend([ks, cf]) run_nodetool(cql, "compact", *args) def take_snapshot(cql, table, tag, skip_flush): ks, cf = table.split('.') if has_rest_api(cql): requests.post(f'{rest_api_url(cql)}/storage_service/snapshots/', params={'kn': ks, 'cf' : cf, 'tag': tag, 'sf': skip_flush}) else: args = ['--tag', tag, '--table', cf] if skip_flush: args.append('--skip-flush') args.append(ks) run_nodetool(cql, "snapshot", *args) def del_snapshot(cql, tag:str, keyspaces:list[str] = []): if has_rest_api(cql): params = {'tag': tag} if keyspaces: params["kn"] = ','.join(keyspaces) requests.delete(f'{rest_api_url(cql)}/storage_service/snapshots/', params=params) else: args = ['--tag', tag] if keyspaces: args.extend(keyspaces) run_nodetool(cql, "clearsnapshot", *args) def refreshsizeestimates(cql): if has_rest_api(cql): # The "nodetool refreshsizeestimates" is not available, or needed, in Scylla pass else: run_nodetool(cql, "refreshsizeestimates") def enablebinary(cql): if has_rest_api(cql): requests.post(f'{rest_api_url(cql)}/storage_service/native_transport') else: run_nodetool(cql, "enablebinary") def disablebinary(cql): if has_rest_api(cql): requests.delete(f'{rest_api_url(cql)}/storage_service/native_transport') else: run_nodetool(cql, "disablebinary") def setlogginglevel(cql, logger, level): if has_rest_api(cql): requests.post(f'{rest_api_url(cql)}/system/logger/{logger}', params={'level': level}) else: run_nodetool(cql, "setlogginglevel", ["--logger", logger, "--level", level]) def parse_keyspace_table(name, separator_chars = '.'): pat = rf"(?P\w+)(?:[{separator_chars}](?P\w+))?" m = re.match(pat, name) return m.group('keyspace'), m.group('table') class no_autocompaction_context: """Disable autocompaction for the enclosed scope, for the provided keyspace(s) or keyspace.table(s). """ def __init__(self, cql, *names): self._cql = cql self._names = list(names) def __enter__(self): for name in self._names: ks, tbl = parse_keyspace_table(name, '.:') if has_rest_api(self._cql): api_path = f"/storage_service/auto_compaction/{ks}" if not tbl else \ f"/column_family/autocompaction/{ks}:{tbl}" ret = requests.delete(f'{rest_api_url(self._cql)}{api_path}') if not ret.ok: raise RuntimeError(f"failed to disable autocompaction using {api_path}: {ret.text}") else: run_nodetool(self._cql, "disableautocompaction", ks, tbl) def __exit__(self, exc_type, exc_value, exc_traceback): for name in self._names: ks, tbl = parse_keyspace_table(name, '.:') if has_rest_api(self._cql): api_path = f"/storage_service/auto_compaction/{ks}" if not tbl else \ f"/column_family/autocompaction/{ks}:{tbl}" ret = requests.post(f'{rest_api_url(self._cql)}{api_path}') if not ret.ok: raise RuntimeError(f"failed to enable autocompaction using {api_path}: {ret.text}") else: run_nodetool(self._cql, "enableautocompaction", ks, tbl) # Send a message to the Scylla log. E.g., we can write a message to the log # indicating that a test has started, which will make it easier to see which # test caused which errors in the log. # If the REST API cannot be reached (e.g., we're running the test against # Cassandra or the REST port is blocked), this function will do nothing. def scylla_log(cql, message, level): if has_rest_api(cql): requests.post(f'{rest_api_url(cql)}/system/log?message={requests.utils.quote(message)}&level={level}')