There is a race condition in driver that raises the RuntimeException. This pollutes the output, so this PR is just silencing this exception. Fixes: SCYLLADB-900 Closes scylladb/scylladb#28957
383 lines
15 KiB
Python
383 lines
15 KiB
Python
# Copyright 2020-present ScyllaDB
|
|
#
|
|
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
##################################################################
|
|
|
|
# Various utility functions which are useful for multiple tests.
|
|
# Note that fixtures aren't here - they are in conftest.py.
|
|
|
|
import string
|
|
import random
|
|
import time
|
|
import socket
|
|
import os
|
|
import requests
|
|
import collections
|
|
import ssl
|
|
from contextlib import contextmanager
|
|
|
|
from cassandra.auth import PlainTextAuthProvider
|
|
from cassandra.cluster import Cluster, ConsistencyLevel, ExecutionProfile, EXEC_PROFILE_DEFAULT
|
|
from cassandra.policies import RoundRobinPolicy
|
|
|
|
from test.pylib.driver_utils import safe_driver_shutdown
|
|
|
|
def random_string(length=10, chars=string.ascii_uppercase + string.digits):
|
|
return ''.join(random.choice(chars) for x in range(length))
|
|
|
|
def random_bytes(length=10):
|
|
return bytearray(random.getrandbits(8) for _ in range(length))
|
|
|
|
# sleep to let a ttl (of `seconds`) expire and
|
|
# the commitlog minimum gc time, in seconds,
|
|
# to be greater than the tombstone deletion time
|
|
def sleep_till_whole_second(seconds=1):
|
|
t = time.time()
|
|
time.sleep(seconds - (t - int(t)))
|
|
|
|
# A function for picking a unique name for test keyspace or table.
|
|
# This name doesn't need to be quoted in CQL - it only contains
|
|
# lowercase letters, numbers, and underscores, and starts with a letter.
|
|
unique_name_prefix = 'cql_test_'
|
|
def unique_name():
|
|
current_ms = int(round(time.time() * 1000))
|
|
# If unique_name() is called twice in the same millisecond...
|
|
if unique_name.last_ms >= current_ms:
|
|
current_ms = unique_name.last_ms + 1
|
|
unique_name.last_ms = current_ms
|
|
return unique_name_prefix + str(current_ms)
|
|
unique_name.last_ms = 0
|
|
|
|
# Functions for picking a unique key to use when multiple tests want to use
|
|
# the same shared table and need to pick different keys so as not to collide.
|
|
# Because different runs do not share the same table (unique_name() above
|
|
# is used to pick the table name), the uniqueness of the keys we generate
|
|
# here does not need to be global - we can just use a simple counter to
|
|
# guarantee uniqueness.
|
|
def unique_key_string():
|
|
unique_key_string.i += 1
|
|
return 's' + str(unique_key_string.i)
|
|
unique_key_string.i = 0
|
|
|
|
def unique_key_int():
|
|
unique_key_int.i += 1
|
|
return unique_key_int.i
|
|
unique_key_int.i = 0
|
|
|
|
|
|
def is_scylla(cql):
|
|
""" Check whether we are running against Scylla or not """
|
|
# We recognize Scylla by checking if there is any system table whose name
|
|
# contains the word "scylla":
|
|
names = [row.table_name for row in cql.execute("SELECT * FROM system_schema.tables WHERE keyspace_name = 'system'")]
|
|
return any('scylla' in name for name in names)
|
|
|
|
def keyspace_has_tablets(cql, keyspace):
|
|
""" Return true if the keyspace was created with tablets.
|
|
|
|
We support running cqlpy against an older version of scylla, so we do
|
|
the detection in a way that accounts for scylla possibly not even knowing
|
|
what tablets is.
|
|
For cassandra, this will always return no.
|
|
|
|
If the keyspace was created with tablets, it will have an entry in
|
|
`system_schema.scylla_keyspaces`, with `initial_tablets` set.
|
|
So here, we simply query this table, looking for a partition for the
|
|
appropriate keyspace. If the result has the `initial_tablets` column and it
|
|
is set, the keyspace has tablets.
|
|
"""
|
|
if not is_scylla(cql):
|
|
return False
|
|
|
|
try:
|
|
res = list(cql.execute(f"SELECT * FROM system_schema.scylla_keyspaces WHERE keyspace_name='{keyspace}'"))
|
|
except:
|
|
# Antique versions of Scylla are is_scylla() but did not have
|
|
# the scylla_keyspaces table. They didn't have tablets either, so
|
|
# we should just return False.
|
|
return False
|
|
# The row might exist due to storage related options, but the tablets
|
|
# related fields are null.
|
|
# So we check that:
|
|
# * the row exists
|
|
# * `initial_tablets` has a value
|
|
if not res:
|
|
return False
|
|
return getattr(res[0], "initial_tablets", None) is not None
|
|
|
|
# A utility function for creating a new temporary keyspace with given options.
|
|
# It can be used in a "with", as:
|
|
# with new_test_keyspace(cql, '...') as keyspace:
|
|
# This is not a fixture - see those in conftest.py.
|
|
@contextmanager
|
|
def new_test_keyspace(cql, opts):
|
|
keyspace = unique_name()
|
|
cql.execute("CREATE KEYSPACE " + keyspace + " " + opts)
|
|
try:
|
|
yield keyspace
|
|
finally:
|
|
cql.execute("DROP KEYSPACE " + keyspace)
|
|
|
|
# A utility function for creating a new temporary table with a given schema.
|
|
# Because Scylla becomes slower when a huge number of uniquely-named tables
|
|
# are created and deleted (see https://github.com/scylladb/scylla/issues/7620)
|
|
# we keep here a list of previously used but now deleted table names, and
|
|
# reuse one of these names when possible.
|
|
# This function can be used in a "with", as:
|
|
# with create_table(cql, test_keyspace, '...') as table:
|
|
previously_used_table_names = []
|
|
@contextmanager
|
|
def new_test_table(cql, keyspace, schema, extra=""):
|
|
global previously_used_table_names
|
|
if not previously_used_table_names:
|
|
previously_used_table_names.append(unique_name())
|
|
table_name = previously_used_table_names.pop()
|
|
table = keyspace + "." + table_name
|
|
cql.execute("CREATE TABLE " + table + "(" + schema + ")" + extra)
|
|
try:
|
|
yield table
|
|
finally:
|
|
cql.execute("DROP TABLE " + table)
|
|
previously_used_table_names.append(table_name)
|
|
|
|
# A utility function for creating a new temporary user-defined type.
|
|
@contextmanager
|
|
def new_type(cql, keyspace, cmd, name=None):
|
|
type_name = keyspace + "." + (name or unique_name())
|
|
cql.execute("CREATE TYPE " + type_name + " " + cmd)
|
|
try:
|
|
yield type_name
|
|
finally:
|
|
cql.execute("DROP TYPE " + type_name)
|
|
|
|
# A utility function for creating a new temporary user-defined function.
|
|
@contextmanager
|
|
def new_function(cql, keyspace, body, name=None, args=None):
|
|
fun = name if name else unique_name()
|
|
cql.execute(f"CREATE FUNCTION {keyspace}.{fun} {body}")
|
|
try:
|
|
yield fun
|
|
finally:
|
|
if args:
|
|
cql.execute(f"DROP FUNCTION {keyspace}.{fun}({args})")
|
|
else:
|
|
cql.execute(f"DROP FUNCTION {keyspace}.{fun}")
|
|
|
|
# A utility function for creating a new temporary user-defined aggregate.
|
|
@contextmanager
|
|
def new_aggregate(cql, keyspace, body):
|
|
aggr = unique_name()
|
|
cql.execute(f"CREATE AGGREGATE {keyspace}.{aggr} {body}")
|
|
try:
|
|
yield aggr
|
|
finally:
|
|
cql.execute(f"DROP AGGREGATE {keyspace}.{aggr}")
|
|
|
|
# A utility function for creating a new temporary materialized view in
|
|
# an existing table.
|
|
@contextmanager
|
|
def new_materialized_view(cql, table, select, pk, where, extra=""):
|
|
keyspace = table.split('.')[0]
|
|
mv = keyspace + "." + unique_name()
|
|
cql.execute(f"CREATE MATERIALIZED VIEW {mv} AS SELECT {select} FROM {table} WHERE {where} PRIMARY KEY ({pk}) {extra}")
|
|
try:
|
|
yield mv
|
|
finally:
|
|
cql.execute(f"DROP MATERIALIZED VIEW {mv}")
|
|
|
|
# A utility function for creating a new temporary secondary index of
|
|
# an existing table.
|
|
@contextmanager
|
|
def new_secondary_index(cql, table, column, name='', extra=''):
|
|
keyspace = table.split('.')[0]
|
|
if not name:
|
|
name = unique_name()
|
|
cql.execute(f"CREATE INDEX {name} ON {table} ({column}) {extra}")
|
|
try:
|
|
yield f"{keyspace}.{name}"
|
|
finally:
|
|
cql.execute(f"DROP INDEX {keyspace}.{name}")
|
|
|
|
def index_table_name(index_name : str):
|
|
return f"{index_name}_index"
|
|
|
|
# Helper function for establishing a connection with given username and password
|
|
@contextmanager
|
|
def cql_session(host, port, is_ssl, username, password, request_timeout=120, protocol_version=4):
|
|
profile = ExecutionProfile(
|
|
load_balancing_policy=RoundRobinPolicy(),
|
|
consistency_level=ConsistencyLevel.LOCAL_QUORUM,
|
|
serial_consistency_level=ConsistencyLevel.LOCAL_SERIAL,
|
|
# The default timeout (in seconds) for execute() commands is 10, which
|
|
# should have been more than enough, but in some extreme cases with a
|
|
# very slow debug build running on a very busy machine and a very slow
|
|
# request (e.g., a DROP KEYSPACE needing to drop multiple tables)
|
|
# 10 seconds may not be enough, so let's increase it. See issue #7838.
|
|
request_timeout=request_timeout)
|
|
if is_ssl:
|
|
# Scylla does not support any earlier TLS protocol. If you try,
|
|
# you will get mysterious EOF errors (see issue #6971) :-(
|
|
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
|
|
else:
|
|
ssl_context = None
|
|
cluster = Cluster(execution_profiles={EXEC_PROFILE_DEFAULT: profile},
|
|
contact_points=[host],
|
|
port=int(port),
|
|
protocol_version=protocol_version,
|
|
auth_provider=PlainTextAuthProvider(username=username, password=password),
|
|
ssl_context=ssl_context,
|
|
# The default timeout for new connections is 5 seconds, and for
|
|
# requests made by the control connection is 2 seconds. These should
|
|
# have been more than enough, but in some extreme cases with a very
|
|
# slow debug build running on a very busy machine, they may not be.
|
|
# so let's increase them to 60 seconds. See issue #11289.
|
|
connect_timeout = 60,
|
|
control_connection_timeout = 60,
|
|
)
|
|
try:
|
|
yield cluster.connect()
|
|
finally:
|
|
safe_driver_shutdown(cluster)
|
|
|
|
@contextmanager
|
|
def new_user(cql, username='', with_superuser_privileges=False):
|
|
if not username:
|
|
username = unique_name()
|
|
cql.execute(f"CREATE ROLE {username} WITH PASSWORD = '{username}' AND SUPERUSER = {with_superuser_privileges} AND LOGIN = true")
|
|
try:
|
|
yield username
|
|
finally:
|
|
cql.execute(f"DROP ROLE {username}")
|
|
|
|
@contextmanager
|
|
def new_session(cql, username):
|
|
endpoint = cql.hosts[0].endpoint
|
|
with cql_session(host=endpoint.address, port=endpoint.port, is_ssl=(cql.cluster.ssl_context is not None), username=username, password=username) as session:
|
|
yield session
|
|
session.shutdown()
|
|
|
|
# new_cql() returns a new object similar to the given cql fixture,
|
|
# connected to the same endpoint but with a separate connection.
|
|
# This can be useful for tests which require a separate connection -
|
|
# for example for testing the "USE" statement (which, after used once
|
|
# on a connection, cannot be undone).
|
|
@contextmanager
|
|
def new_cql(cql):
|
|
session = cql.cluster.connect()
|
|
try:
|
|
yield session
|
|
finally:
|
|
session.shutdown()
|
|
|
|
def project(column_name_string, rows):
|
|
"""Returns a list of column values from each of the rows."""
|
|
return [getattr(r, column_name_string) for r in rows]
|
|
|
|
# Utility function for trying to find a local process which is listening to
|
|
# the address and port to which our our CQL connection is connected. If such a
|
|
# process exists, return its process id (as a string). Otherwise, return None.
|
|
# Note that the local process needs to belong to the same user running this
|
|
# test, or it cannot be found.
|
|
def local_process_id(cql):
|
|
ip = socket.gethostbyname(cql.cluster.contact_points[0])
|
|
port = cql.cluster.port
|
|
# Implement something like the shell "lsof -Pni @{ip}:{port}", just
|
|
# using /proc without any external shell command.
|
|
# First, we look in /proc/net/tcp for a LISTEN socket (state 0x0A) at the
|
|
# desired local address. The address is specially-formatted hex of the ip
|
|
# and port, with 0100007F:2352 for 127.0.0.1:9042. We check for two
|
|
# listening addresses: one is the specific IP address given, and the
|
|
# other is listening on address 0 (INADDR_ANY).
|
|
ip2hex = lambda ip: ''.join([f'{int(x):02X}' for x in reversed(ip.split('.'))])
|
|
port2hex = lambda port: f'{int(port):04X}'
|
|
addr1 = ip2hex(ip) + ':' + port2hex(port)
|
|
addr2 = ip2hex('0.0.0.0') + ':' + port2hex(port)
|
|
LISTEN = '0A'
|
|
with open('/proc/net/tcp', 'r') as f:
|
|
for line in f:
|
|
cols = line.split()
|
|
if cols[3] == LISTEN and (cols[1] == addr1 or cols[1] == addr2):
|
|
inode = cols[9]
|
|
break
|
|
else:
|
|
# Didn't find a process listening on the given address
|
|
return None
|
|
# Now look in /proc/*/fd/* for processes that have this socket "inode"
|
|
# as one of its open files. We can only find a process that belongs to
|
|
# the same user.
|
|
target = f'socket:[{inode}]'
|
|
for proc in os.listdir('/proc'):
|
|
if not proc.isnumeric():
|
|
continue
|
|
dir = f'/proc/{proc}/fd/'
|
|
try:
|
|
for fd in os.listdir(dir):
|
|
try:
|
|
if os.readlink(dir + fd) == target:
|
|
# Found the process!
|
|
return proc
|
|
except:
|
|
pass
|
|
except:
|
|
# Ignore errors. We can't check processes we don't own.
|
|
pass
|
|
return None
|
|
|
|
# user_type("a", 1, "b", 2) creates a named tuple with component names "a", "b"
|
|
# and values 1, 2. The return of this function can be used to bind to a UDT.
|
|
# The number of arguments is assumed to be even.
|
|
def user_type(*args):
|
|
return collections.namedtuple('user_type', args[::2])(*args[1::2])
|
|
|
|
class config_value_context:
|
|
"""Change the value of a config item while the context is active.
|
|
|
|
The config item has to be live-updatable.
|
|
"""
|
|
def __init__(self, cql, key, value):
|
|
self._cql = cql
|
|
self._key = key
|
|
self._value = value
|
|
self._original_value = None
|
|
self._select = cql.prepare("SELECT value FROM system.config WHERE name=?")
|
|
self._update = cql.prepare("UPDATE system.config SET value=? WHERE name=?")
|
|
|
|
def __enter__(self):
|
|
self._original_value = self._cql.execute(self._select, (self._key,)).one().value
|
|
self._cql.execute(self._update, (self._value, self._key))
|
|
|
|
def __exit__(self, exc_type, exc_value, exc_traceback):
|
|
self._cql.execute(self._update, (self._original_value, self._key))
|
|
|
|
class ScyllaMetrics:
|
|
def __init__(self, lines):
|
|
self._lines = lines
|
|
@staticmethod
|
|
def query(cql):
|
|
url = f'http://{cql.cluster.contact_points[0]}:9180/metrics'
|
|
return ScyllaMetrics(requests.get(url).text.split('\n'))
|
|
def get(self, name, labels = None, shard='total'):
|
|
result = None
|
|
for l in self._lines:
|
|
if not l.startswith(name):
|
|
continue
|
|
labels_start = l.find('{')
|
|
labels_finish = l.find('}')
|
|
if labels_start == -1 or labels_finish == -1:
|
|
raise ValueError(f'invalid metric format [{l}]')
|
|
def match_kv(kv):
|
|
key, val = kv.split('=')
|
|
val = val.strip('"')
|
|
return shard == 'total' or val == shard if key == 'shard' \
|
|
else labels is None or labels.get(key, None) == val
|
|
match = all(match_kv(kv) for kv in l[labels_start + 1:labels_finish].split(','))
|
|
if match:
|
|
value = float(l[labels_finish + 2:])
|
|
if result is None:
|
|
result = value
|
|
else:
|
|
result += value
|
|
if shard != 'total':
|
|
break
|
|
return result
|