Files
scylladb/test/cqlpy/nodetool.py
Nadav Har'El 64a5eee6b9 test/cqlpy: insert test names into Scylla logs
Both test.py and test/cqlpy/run run many test functions against the same
Scylla process. In the resulting log file, it is hard to understand which
log messages are related to which test. In this patch, we log a message
(using the "/system/log" REST API) every time a test is started or ends.

The messages look like this:

    INFO  2025-04-22 15:10:44,625 [shard 1:strm] api - /system/log:
    test/cqlpy: Starting test_lwt.py::test_lwt_missing_row_with_static
    ...
    INFO  2025-04-22 15:10:44,631 [shard 0:strm] api - /system/log:
    test/cqlpy: Ended test_lwt.py::test_lwt_missing_row_with_static

We already had a similar feature in test/alternator, added three years
ago in commit b0371b6bf8. The implementation
is similar but not identical due to different available utility functions,
and in any case it's very simple.

While at it, this patch also fixes the has_rest_api() to timeout after
one second. Without this, if the REST API is blocked in a way that
a connection attempt just hangs, the tests can hang. With the new
timeout, the test will hang for a second, realize the REST API is
not available, and remember this decision (the next tests will not
wait one second again). We had the same bug in Alternator, and fixed
it in 758f8f01d7. This one second "pause"
will only happen if the REST API port is blocked - in the more typical
case the REST API port is just not listening but not blocked, and the
failure will be noticed immediately and won't wait a whole second.

Signed-off-by: Nadav Har'El <nyh@scylladb.com>

Closes scylladb/scylladb#23857
2025-04-23 12:04:14 +03:00

210 lines
8.4 KiB
Python

# Copyright 2021-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
##################################################################
# This file provides a few nodetool-compatible commands that may be useful
# for tests. The intent is *not* to test nodetool itself, but just to
# use nodetool functionality - e.g., "nodetool flush" when testing CQL.
#
# When testing against a locally-running Scylla these functions do not
# actually use nodetool but rather Scylla's REST API. This simplifies
# running the test because an external Java process implementing JMX is
# not needed. However, when the REST API port is not available (i.e, when
# testing against Cassandra or some remote installation of Scylla) the
# external "nodetool" command is used, and must be available in the path or
# chosen with the NODETOOL environment variable.
import requests
import os
import subprocess
import shutil
import pytest
import re
# For a "cql" object connected to one node, find the REST API URL
# with the same node and port 10000.
# TODO: We may need to change this function or its callers to add proper
# support for testing on multi-node clusters.
def rest_api_url(cql):
return f'http://{cql.cluster.contact_points[0]}:10000'
# Check whether the REST API at port 10000 is available - if not we will
# fall back to using an external "nodetool" program.
# We only check this once per REST API URL, and cache the decision.
checked_rest_api = {}
def has_rest_api(cql):
url = rest_api_url(cql)
if not url in checked_rest_api:
# Scylla's REST API does not have an official "ping" command,
# so we just list the keyspaces as a (usually) short operation
try:
# Use a short timeout to give up quickly if the REST API port
# is blocked in a way that the request can just hang forever.
ok = requests.get(f'{url}/column_family/name/keyspace', timeout=1).ok
except:
ok = False
checked_rest_api[url] = ok
return checked_rest_api[url]
# Find the external "nodetool" executable (can be overridden by the NODETOOL
# environment variable). Only call this if the REST API doesn't work.
def nodetool_cmd():
if nodetool_cmd.cmd:
return nodetool_cmd.cmd
if not nodetool_cmd.failed:
nodetool_cmd.conf = os.getenv('NODETOOL') or 'nodetool'
nodetool_cmd.cmd = shutil.which(nodetool_cmd.conf)
if nodetool_cmd.cmd is None:
nodetool_cmd.failed = True
if nodetool_cmd.failed:
pytest.fail(f"Error: Can't find {nodetool_cmd.conf}. Please set the NODETOOL environment variable to the path of the nodetool utility.", pytrace=False)
return nodetool_cmd.cmd
nodetool_cmd.cmd = None
nodetool_cmd.failed = False
nodetool_cmd.conf = False
# Run the external "nodetool" executable (can be overridden by the NODETOOL
# environment variable). Only call this if the REST API doesn't work.
def run_nodetool(cql, *args):
# TODO: We may need to change this function or its callers to add proper
# support for testing on multi-node clusters.
host = cql.cluster.contact_points[0]
subprocess.run([nodetool_cmd(), '-h', host, *args])
def flush(cql, table):
ks, cf = table.split('.')
if has_rest_api(cql):
requests.post(f'{rest_api_url(cql)}/storage_service/keyspace_flush/{ks}', params={'cf' : cf})
else:
run_nodetool(cql, "flush", ks, cf)
def flush_keyspace(cql, ks):
if has_rest_api(cql):
requests.post(f'{rest_api_url(cql)}/storage_service/keyspace_flush/{ks}')
else:
run_nodetool(cql, "flush", ks)
def flush_all(cql):
if has_rest_api(cql):
requests.post(f'{rest_api_url(cql)}/storage_service/flush')
else:
run_nodetool(cql, "flush")
def compact(cql, table, flush_memtables=True):
ks, cf = table.split('.')
if has_rest_api(cql):
params = {'cf': cf}
if not flush_memtables:
params["flush_memtables"] = "false"
requests.post(f'{rest_api_url(cql)}/storage_service/keyspace_compaction/{ks}', params=params)
else:
args = [] if not flush_memtables else ["--flush-memtables", "false"]
args.extend([ks, cf])
run_nodetool(cql, "compact", *args)
def compact_keyspace(cql, ks, flush_memtables=True):
if has_rest_api(cql):
params = None
if not flush_memtables:
params = {"flush_memtables": "false"}
requests.post(f'{rest_api_url(cql)}/storage_service/keyspace_compaction/{ks}', params=params)
else:
args = [] if not flush_memtables else ["--flush-memtables", "false"]
args.extend([ks, cf])
run_nodetool(cql, "compact", *args)
def take_snapshot(cql, table, tag, skip_flush):
ks, cf = table.split('.')
if has_rest_api(cql):
requests.post(f'{rest_api_url(cql)}/storage_service/snapshots/', params={'kn': ks, 'cf' : cf, 'tag': tag, 'sf': skip_flush})
else:
args = ['--tag', tag, '--table', cf]
if skip_flush:
args.append('--skip-flush')
args.append(ks)
run_nodetool(cql, "snapshot", *args)
def del_snapshot(cql, tag:str, keyspaces:list[str] = []):
if has_rest_api(cql):
params = {'tag': tag}
if keyspaces:
params["kn"] = ','.join(keyspaces)
requests.delete(f'{rest_api_url(cql)}/storage_service/snapshots/', params=params)
else:
args = ['--tag', tag]
if keyspaces:
args.extend(keyspaces)
run_nodetool(cql, "clearsnapshot", *args)
def refreshsizeestimates(cql):
if has_rest_api(cql):
# The "nodetool refreshsizeestimates" is not available, or needed, in Scylla
pass
else:
run_nodetool(cql, "refreshsizeestimates")
def enablebinary(cql):
if has_rest_api(cql):
requests.post(f'{rest_api_url(cql)}/storage_service/native_transport')
else:
run_nodetool(cql, "enablebinary")
def disablebinary(cql):
if has_rest_api(cql):
requests.delete(f'{rest_api_url(cql)}/storage_service/native_transport')
else:
run_nodetool(cql, "disablebinary")
def setlogginglevel(cql, logger, level):
if has_rest_api(cql):
requests.post(f'{rest_api_url(cql)}/system/logger/{logger}', params={'level': level})
else:
run_nodetool(cql, "setlogginglevel", ["--logger", logger, "--level", level])
def parse_keyspace_table(name, separator_chars = '.'):
pat = rf"(?P<keyspace>\w+)(?:[{separator_chars}](?P<table>\w+))?"
m = re.match(pat, name)
return m.group('keyspace'), m.group('table')
class no_autocompaction_context:
"""Disable autocompaction for the enclosed scope, for the provided keyspace(s) or keyspace.table(s).
"""
def __init__(self, cql, *names):
self._cql = cql
self._names = list(names)
def __enter__(self):
for name in self._names:
ks, tbl = parse_keyspace_table(name, '.:')
if has_rest_api(self._cql):
api_path = f"/storage_service/auto_compaction/{ks}" if not tbl else \
f"/column_family/autocompaction/{ks}:{tbl}"
ret = requests.delete(f'{rest_api_url(self._cql)}{api_path}')
if not ret.ok:
raise RuntimeError(f"failed to disable autocompaction using {api_path}: {ret.text}")
else:
run_nodetool(self._cql, "disableautocompaction", ks, tbl)
def __exit__(self, exc_type, exc_value, exc_traceback):
for name in self._names:
ks, tbl = parse_keyspace_table(name, '.:')
if has_rest_api(self._cql):
api_path = f"/storage_service/auto_compaction/{ks}" if not tbl else \
f"/column_family/autocompaction/{ks}:{tbl}"
ret = requests.post(f'{rest_api_url(self._cql)}{api_path}')
if not ret.ok:
raise RuntimeError(f"failed to enable autocompaction using {api_path}: {ret.text}")
else:
run_nodetool(self._cql, "enableautocompaction", ks, tbl)
# Send a message to the Scylla log. E.g., we can write a message to the log
# indicating that a test has started, which will make it easier to see which
# test caused which errors in the log.
# If the REST API cannot be reached (e.g., we're running the test against
# Cassandra or the REST port is blocked), this function will do nothing.
def scylla_log(cql, message, level):
if has_rest_api(cql):
requests.post(f'{rest_api_url(cql)}/system/log?message={requests.utils.quote(message)}&level={level}')