Files
scylladb/test/cqlpy/run.py
Nadav Har'El 154cecda71 test: set low alternator_ttl_period_in_seconds in CQL tests
In test/alternator/run we set alternator_ttl_period_in_seconds to a very
low number (0.5 seconds) to allow TTL tests to expire items very quickly
and finish quickly.

Until now, we didn't need to do this for CQL tests, because they weren't
using this Alternator-only feature. Now that CQL uses the same expiration
feature with its original configuration parameter, we need to set it in
CQL tests too.

Signed-off-by: Nadav Har'El <nyh@scylladb.com>
2026-02-25 14:59:43 +02:00

530 lines
23 KiB
Python
Executable File

#!/usr/bin/env python3
import os
import glob
import sys
import time
import shutil
import signal
import atexit
import requests
# run_with_temporary_dir() is a utility function for running a process, such
# as Scylla and Cassandra, inside its own new temporary directory,
# and ensure that on exit for any reason - success, failure, signal or
# exception - the subprocess is killed and its temporary directory is deleted.
#
# The parameter to run_with_temporary_dir() is a function which receives the
# new process id and the new temporary directory's path, and builds the
# command line to run and map of extra environment variables. This function
# can put files in the directory (which already exists when it is called).
# See below the example run_scylla_cmd.
def pid_to_dir(pid):
return os.path.join(os.getenv('TMPDIR', '/tmp'), 'scylla-test-'+str(pid))
def run_with_generated_dir(run_cmd_generator, run_dir_generator):
global run_with_temporary_dir_pids
global run_pytest_pids
# Below, there is a small time window, after we fork and the child
# started running but before we save this child's process id in
# run_with_temporary_dir_pids. In that small time window, a signal may
# kill the parent process but not cleanup the child. So we use sigmask
# to postpone signal delivery during that time window:
mask = signal.pthread_sigmask(signal.SIG_BLOCK, {})
signal.pthread_sigmask(signal.SIG_BLOCK, {signal.SIGINT, signal.SIGQUIT, signal.SIGTERM})
sys.stdout.flush()
sys.stderr.flush()
pid = os.fork()
if pid == 0:
# Child
run_with_temporary_dir_pids = set() # no children to clean up on child
run_pytest_pids = set()
pid = os.getpid()
dir = run_dir_generator(pid)
(cmd, env) = run_cmd_generator(pid, dir)
# redirect stdout and stderr to log file, as in a shell's >log 2>&1:
log = os.path.join(dir, 'log')
fd = os.open(log, os.O_WRONLY | os.O_CREAT | os.O_APPEND, mode=0o666)
sys.stdout.flush()
os.close(1)
os.dup2(fd, 1)
sys.stderr.flush()
os.close(2)
os.dup2(fd, 2)
# Detach child from parent's "session", so that a SIGINT will be
# delivered just to the parent, not to the child. Instead, the parent
# will eventually deliver a SIGKILL as part of cleanup_all().
os.setsid()
os.execve(cmd[0], cmd, dict(os.environ, **env))
# execve will not return. If it cannot run the program, it will raise
# an exception.
# Parent
run_with_temporary_dir_pids.add(pid)
signal.pthread_sigmask(signal.SIG_SETMASK, mask)
return pid
def make_new_tempdir(pid):
dir = pid_to_dir(pid)
os.mkdir(dir)
return dir
def run_with_temporary_dir(run_cmd_generator):
return run_with_generated_dir(run_cmd_generator, make_new_tempdir)
def restart_with_dir(old_pid, run_cmd_generator, dir):
try:
os.killpg(old_pid, 2)
os.waitpid(old_pid, 0)
except ProcessLookupError:
pass
scylla_link = os.path.join(dir, 'test_scylla')
os.unlink(scylla_link)
return run_with_generated_dir(run_cmd_generator, lambda pid : dir)
# run_with_temporary_dir_pids is a set of process ids previously created
# by run_with_temporary_dir(). On exit, the processes listed here are
# cleaned up. Note that there is a known mapping (pid_to_dir()) from each
# pid to the temporary directory - which will also be removed.
run_with_temporary_dir_pids = set()
# killpg_retry() behaves like killpg() (kill process-group), excepts that
# after trying the given signal (sig), if the main process (pid) doesn't
# exit within a given timeout, it is killed again with SIGKILL. In any case,
# the other processes in the process group besides pid itself are killed
# with SIGKILL at the end.
def killpg_retry(pid, sig, timeout):
os.killpg(pid, sig)
deadline = time.time() + timeout
sleeptime = 0.01
while time.time() < deadline:
# To check if the process died, we need to first get rid of the
# zombie (if it exists) with waitpid, and then check if the process
# still exists, with kill.
try:
os.waitpid(pid, os.WNOHANG)
os.kill(pid, 0)
except (ProcessLookupError, ChildProcessError):
# The process is dead, we're done. But just in case the process
# itself is dead but other processes in its process group survived,
# kill them all again with SIGKILL.
try:
os.killpg(pid, signal.SIGKILL)
except (ProcessLookupError, ChildProcessError):
pass
return
time.sleep(sleeptime)
sleeptime = sleeptime * 2
# The gentle signal didn't work (or didn't work quickly enough).
# Fall back to killing with SIGKILL, that is guaranteed to work.
try:
os.killpg(pid, signal.SIGKILL)
except (ProcessLookupError, ChildProcessError):
pass
# abort_run_with_temporary_dir() kills a process started earlier by
# run_with_temporary_directory, and and removes its temporary directory.
# Currently, the log file is opened and returned so the caller can show
# it to the standard output even after the directory is removed. In the
# future we may want to change this - and save the log somewhere instead
# of copying it to stdout.
def abort_run_with_dir(pid, tmpdir):
try:
# We can use os.killpg(pid, signal.SIGKILL) to always kill Scylla
# immediately with SIGKILL, but trying SIGTERM first has a tiny
# overhead (0.02 seconds), and allows testing the shutdown path
# and flushing test-coverage data in a coverage run.
killpg_retry(pid, signal.SIGTERM, 10)
os.waitpid(pid, 0) # don't leave an annoying zombie
except (ProcessLookupError, ChildProcessError):
pass
# We want to read tmpdir/log to stdout, but if stdout is piped, this can
# take a long time and be interrupted. We don't want the rmtree() below
# to not happen in that case. So we need to open the log file first,
# delete the directory (the open file will not be really deleted unti we
# close it) - and only then start showing the log file.
f = open(os.path.join(tmpdir, 'log'), 'rb')
# Be paranoid about rmtree accidentally removing the entire disk...
# TODO: check tmpdir is actually in TMPDIR and refuse to remove it
# if not.
if tmpdir != '/':
shutil.rmtree(tmpdir)
return f
def abort_run_with_temporary_dir(pid):
return abort_run_with_dir(pid, pid_to_dir(pid))
if omit_scylla_output := "--omit-scylla-output" in sys.argv:
sys.argv.remove("--omit-scylla-output") # don't pass this option to pytest
summary=''
run_pytest_pids = set()
def cleanup_all():
global omit_scylla_output
global summary
global run_with_temporary_dir_pids
global run_pytest_pids
# Kill pytest first, before killing the tested server, so we don't
# continue to get a barrage of errors when the test runs with the
# server killed.
for pid in run_pytest_pids:
try:
os.killpg(pid, 9)
os.waitpid(pid, 0) # don't leave an annoying zombie
except ProcessLookupError:
pass
for pid in run_with_temporary_dir_pids:
f = abort_run_with_temporary_dir(pid)
if not omit_scylla_output:
print('\nSubprocess output:\n')
sys.stdout.flush()
shutil.copyfileobj(f, sys.stdout.buffer)
f.close()
print(summary)
# We run the cleanup_all() function on exit for any reason - successful finish
# of the script, an uncaught exception, or a signal. It ensures that the
# subprocesses (e.g., Scylla) is killed and its temporary storage directory
# is deleted. It also shows the subprocesses's output log.
atexit.register(cleanup_all)
# If Python doesn't catch a particular signal, the atexit handler will not
# get called. By default SIGINT is caught, but SIGTERM and SIGHUP are not,
# so let's catch them explicitly:
for sig in [signal.SIGTERM, signal.SIGHUP]:
signal.signal(sig, lambda sig, frame:
sys.exit(f'Received signal {signal.Signals(sig).name}. Exiting.'))
##############################
# When we run a server - e.g., Scylla or Cassandra - we want to
# have it listen on a unique IP address so it doesn't collide with other
# servers run by other concurrent tests. Luckily, Linux allows us to use any
# IP address in the range 127/8 (i.e., 127.*.*.*). If we pick an IP address
# based on the server's main process id, we know two servers will not
# get the same IP address. We avoid 127.0.*.* because CCM (a different test
# framework) assumes it will be available for it to run Scylla instances.
# 127.255.255.255 is also illegal. So we use 127.{1-254}.*.*.
# This gives us a total of 253*255*255 possible IP addresses - which is
# significantly more than /proc/sys/kernel/pid_max on any system I know.
def pid_to_ip(pid):
bytes = pid.to_bytes(3, byteorder='big')
return '127.' + str(bytes[0]+1) + '.' + str(bytes[1]) + '.' + str(bytes[2])
##############################
# Specific code for running *Scylla*:
import cassandra.cluster
import ssl
# Find a Scylla executable. By default, we take the build/*/scylla
# executable next to the location of this script, provided it's the
# only one that matches this wildcard, but this can be overridden
# by setting a SCYLLA environment variable:
source_path = os.path.realpath(os.path.join(__file__, '../../..'))
scylla = None
def find_scylla():
global scylla
global source_path
if scylla:
return scylla
if os.getenv('SCYLLA'):
scylla = os.path.abspath(os.getenv('SCYLLA'))
else:
scyllas = glob.glob(os.path.join(source_path, 'build/*/scylla'))
if not scyllas:
print("Can't find a Scylla executable in {}.\nPlease build Scylla or set SCYLLA to the path of a Scylla executable.".format(source_path))
exit(1)
if len(scyllas) > 1:
print("Found several scylla executables and cannot chose one.\nPlease set SCYLLA to one of the following paths:\n{}".format('\n'.join(scyllas)))
exit(1)
scylla = scyllas[0]
if not os.access(scylla, os.X_OK):
print("Cannot execute '{}'.\nPlease set SCYLLA to the path of a Scylla executable.".format(scylla))
exit(1)
return scylla
def run_scylla_cmd(pid, dir):
ip = pid_to_ip(pid)
print('Booting Scylla on ' + ip + ' in ' + dir + '...')
global scylla
global source_path
# To make things easier for users of "killall", "top", and similar,
# we want the Scylla executable which we run during the test to have
# a different name from manual runs of Scylla. Unfortunately, using
# execve() to change just argv[0] isn't good enough - because killall
# inspects the actual executable filename in /proc/<pid>/stat. So we
# need to name the executable differently. Luckily, using a symbolic
# link is good enough.
scylla_link = os.path.join(dir, 'test_scylla')
os.symlink(scylla, scylla_link)
# When running a Scylla build with sanitizers enabled, we should
# configure them to fail on real errors, and ignore spurious errors.
env = {
'UBSAN_OPTIONS': f'halt_on_error=1:abort_on_error=1:suppressions={source_path}/ubsan-suppressions.supp',
'ASAN_OPTIONS': 'disable_coredump=0:abort_on_error=1:detect_stack_use_after_returns=1'
}
return ([scylla_link,
'--options-file', source_path + '/conf/scylla.yaml',
'--developer-mode', '1',
'--ring-delay-ms', '0',
'--collectd', '0',
'--smp', '2',
'-m', '1G',
'--overprovisioned',
'--max-networking-io-control-blocks=1000',
'--unsafe-bypass-fsync', '1',
'--kernel-page-cache=1',
'--commitlog-use-o-dsync', '0',
'--flush-schema-tables-after-modification=false',
'--api-address', ip,
'--rpc-address', ip,
'--listen-address', ip,
'--prometheus-address', ip,
'--seed-provider-parameters', 'seeds=' + ip,
'--workdir', dir,
'--auto-snapshot', '0',
'--skip-wait-for-gossip-to-settle', '0',
'--logger-log-level', 'compaction=warn',
'--logger-log-level', 'migration_manager=warn',
# Use lower settings for some parameters to allow faster testing
'--num-tokens', '16',
'--query-tombstone-page-limit=1000',
# Significantly increase default timeouts to allow running tests
# on a very slow setup (but without network losses). Note that these
# are server-side timeouts: The client should also avoid timing out
# its own requests - for this reason we increase the CQL driver's
# client-side timeout in conftest.py.
'--range-request-timeout-in-ms', '300000',
'--read-request-timeout-in-ms', '300000',
'--counter-write-request-timeout-in-ms', '300000',
'--cas-contention-timeout-in-ms', '300000',
'--truncate-request-timeout-in-ms', '300000',
'--write-request-timeout-in-ms', '300000',
'--request-timeout-in-ms', '300000',
'--user-defined-function-time-limit-ms', '1000',
'--group0-raft-op-timeout-in-ms=300000',
# Allow testing experimental features. Following issue #9467, we need
# to add here specific experimental features as they are introduced.
# Note that Alternator-specific experimental features are listed in
# test/alternator/run.
'--experimental-features=udf',
'--experimental-features=keyspace-storage-options',
'--experimental-features=views-with-tablets',
'--enable-tablets=true',
'--enable-user-defined-functions', '1',
# Views with tablets refuse to work if this option is not on :-(
'--rf-rack-valid-keyspaces=1',
# Set up authentication in order to allow testing this module
# and other modules dependent on it: e.g. service levels
'--authenticator', 'PasswordAuthenticator',
'--authorizer', 'CassandraAuthorizer',
'--strict-allow-filtering=true',
'--strict-is-not-null-in-views=true',
'--permissions-update-interval-in-ms', '100',
'--permissions-validity-in-ms', '5',
'--shutdown-announce-in-ms', '0',
'--maintenance-socket=workdir',
'--service-levels-interval-ms=500',
'--tablets-initial-scale-factor=1',
# Avoid unhelpful "guardrails" warnings
'--minimum-replication-factor-warn-threshold=-1',
# The expiration scanner's period (started for Alternator but
# now also CQL)
'--alternator-ttl-period-in-seconds=0.5',
], env)
# Same as run_scylla_cmd, just use SSL encryption for the CQL port (same
# port number as default - replacing the unencrypted server)
def run_scylla_ssl_cql_cmd(pid, dir):
(cmd, env) = run_scylla_cmd(pid, dir)
setup_ssl_certificate(dir)
cmd += ['--client-encryption-options', 'enabled=true',
'--client-encryption-options', f'keyfile={dir}/scylla.key',
'--client-encryption-options', f'certificate={dir}/scylla.crt',
]
return (cmd, env)
# Download the requested precompiled Scylla release using fetch_scylla.py,
# and cache it in a subdirectory of <source_path>/build whose name is
# the specific release actually download.
# This function returns a path of the executable to run Scylla (actually,
# a shell-script wrapper that sets the LD_LIBRARY_PATH appropriately).
def download_precompiled_scylla(release):
import fetch_scylla
return fetch_scylla.download_scylla(release, os.path.join(source_path, 'build'))
# Instead of run_scylla_cmd, which runs Scylla executable compiled in this
# build directory, run_precompiled_scylla_cmd runs a release of Scylla
# downloaded by fetch_precompiled_scylla:
def run_precompiled_scylla_cmd(exe, pid, dir):
(cmd, env) = run_scylla_cmd(pid, dir)
# run_scylla_cmd linked "test_scylla" to the built Scylla, we want to
# link it to the wrapper script we just downloaded:
scylla_link = os.path.join(dir, 'test_scylla')
os.unlink(scylla_link)
os.symlink(exe, scylla_link)
# Unfortunately, earlier Scylla versions required different command line
# options to run, so for some old versions we need to drop some of the
# command line options added above in run_scylla_cmd, or add more
# options. We do this hard-coded for particular versions, which is
# kind of ugly and high-maintenance :-( Maybe in the future we could
# detect this automatically by using "--help" or something.
version = os.path.basename(os.path.dirname(exe)).split('~')[0].split('.')
major = [int(version[0]), int(version[1])]
enterprise = major[0] > 2000
if major[0] < 6 or (enterprise and major <= [2024,1]):
cmd.remove('--enable-tablets=true')
cmd.remove('--maintenance-socket=workdir')
cmd.remove('--service-levels-interval-ms=500')
if major < [5,4] or (enterprise and major <= [2023,1]):
cmd.remove('--strict-is-not-null-in-views=true')
cmd.remove('--minimum-replication-factor-warn-threshold=-1')
if major <= [5,1] or (enterprise and major <= [2022,2]):
cmd.remove('--query-tombstone-page-limit=1000')
if major <= [5,0] or (enterprise and major <= [2022,1]):
cmd.remove('--experimental-features=keyspace-storage-options')
if major <= [4,5] or (enterprise and major <= [2021,1]):
cmd.remove('--kernel-page-cache=1')
cmd.remove('--flush-schema-tables-after-modification=false')
cmd.remove('--strict-allow-filtering=true')
if major <= [6,2] or (enterprise and major < [2025,1]):
cmd.remove('--experimental-features=views-with-tablets')
if major <= [4,5]:
cmd.remove('--max-networking-io-control-blocks=1000')
if major == [5,4] or major == [2024,1]:
cmd.append('--force-schema-commit-log=true')
if major < [2025,1]:
cmd.remove('--tablets-initial-scale-factor=1')
cmd.remove('--rf-rack-valid-keyspaces=1')
if major < [2025,2]:
cmd.remove('--group0-raft-op-timeout-in-ms=300000')
return (cmd, env)
# Get a Cluster object to connect to CQL at the given IP address (and with
# the appropriate username and password). It's important to shutdown() this
# Cluster object when done with it, otherwise we can get errors at the end
# of the run when background tasks continue to spawn futures after exit.
def get_cql_cluster(ip, ssl_context=None):
auth_provider = cassandra.auth.PlainTextAuthProvider(username='cassandra', password='cassandra')
return cassandra.cluster.Cluster([ip],
auth_provider=auth_provider,
ssl_context=ssl_context,
# The default timeout for new connections is 5 seconds, and for
# requests made by the control connection is 2 seconds. These should
# have been more than enough, but in some extreme cases with a very
# slow debug build running on a very busy machine, they may not be.
# so let's increase them to 60 seconds. See issue #13239.
connect_timeout = 60,
control_connection_timeout = 60)
## Test that CQL is serving, for wait_for_services() below.
def check_cql(ip, ssl_context=None):
try:
cluster = get_cql_cluster(ip, ssl_context)
cluster.connect()
cluster.shutdown()
except cassandra.cluster.NoHostAvailable:
raise NotYetUp
# Any other exception may indicate a problem, and is passed to the caller.
def check_ssl_cql(ip):
# Note that Scylla does not support any earlier TLS protocol. If you
# try, you get mysterious EOF errors (see issue #6971) :-(
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
check_cql(ip, ssl_context)
# Test that the Scylla REST API is serving.
# Can be used as a checker function with wait_for_services() below.
def check_rest_api(ip, port=10000):
try:
requests.get(f"http://{ip}:{port}/")
# getting "/" returns error status 404 but we don't care
# as long as the server returns it
except requests.exceptions.ConnectionError:
raise NotYetUp
# Any other exception may indicate a problem, and is passed to the caller.
# wait_for_services() waits for scylla to finish booting successfully and
# listen to services checked by the given "checkers". Raises an exception
# if we know Scylla did not boot properly (as soon as we know - not waiting
# for a timeout).
#
# Each checker is a function which returns successfully if the service it
# checks is up, or throws an exception if it is not. If the service is not
# *yet* up, it should throw the NotYetUp exception, indicating that
# wait_for_services() should continue to retry. Any other exceptions means
# an unrecoverable error was detected, and retry would be hopeless.
#
# wait_for_services has a hard-coded timeout of 200 seconds.
class NotYetUp(Exception):
pass
def wait_for_services(pid, checkers):
start_time = time.time()
ready = False
while time.time() < start_time + 200:
time.sleep(0.1)
# To check if Scylla died already (i.e., failed to boot), we need
# to first get rid of the zombie (if it exists) with waitpid, and
# then check if the process still exists, with kill.
try:
os.waitpid(pid, os.WNOHANG)
os.kill(pid, 0)
except (ProcessLookupError, ChildProcessError):
# Scylla is dead, we cannot recover
break
try:
for checker in checkers:
checker()
# If all checkers passed, we're finally done
ready = True
break
except NotYetUp:
pass
duration = str(round(time.time() - start_time, 1)) + ' seconds'
if not ready:
print(f'Boot failed after {duration}.')
# Run the checkers again, not catching NotYetUp, to show exception
# traces of which of the checks failed and how.
os.waitpid(pid, os.WNOHANG)
os.kill(pid, 0)
for checker in checkers:
checker()
print(f'Boot successful ({duration}).')
sys.stdout.flush()
def wait_for_cql(pid, ip):
wait_for_services(pid, [lambda: check_cql(ip)])
def run_pytest(pytest_dir, additional_parameters):
global run_with_temporary_dir_pids
global run_pytest_pids
sys.stdout.flush()
sys.stderr.flush()
pid = os.fork()
if pid == 0:
# child:
run_with_temporary_dir_pids = set() # no children to clean up on child
run_pytest_pids = set()
os.environ.setdefault('SCYLLA_TEST_RUNNER', 'runpy')
os.chdir(pytest_dir)
os.setsid()
os.execvp('pytest', ['pytest',
'-o', 'junit_family=xunit2'] + additional_parameters)
exit(1)
# parent:
run_pytest_pids.add(pid)
if os.waitpid(pid, 0)[1]:
return False
else:
return True
# Set up self-signed SSL certificate in dir/scylla.key, dir/scylla.crt.
# These can be used for setting up an HTTPS server for Alternator, or for
# any other part of Scylla which needs SSL.
def setup_ssl_certificate(dir):
# FIXME: error checking (if "openssl" isn't found, for example)
os.system(f'openssl genrsa 2048 > "{dir}/scylla.key"')
os.system(f'openssl req -new -x509 -nodes -sha256 -days 365 -subj "/C=IL/ST=None/L=None/O=None/OU=None/CN=example.com" -key "{dir}/scylla.key" -out "{dir}/scylla.crt"')