Files
scylladb/test/cluster/dtest/dtest_class.py
Michał Chojnowski 6b413e3959 cluster/dtest/bypass_cache_test: switch from highest_supported_sstable_format to chosen_sstable_format
Trie-based indexes and older indexes have a difference in metrics,
and the test uses the metrics to check for bypass cache.
To choose the right metrics, it uses highest_supported_sstable_format,
which is inappropriate, because the sstable format chosen for writes
by Scylla might be different than highest_supported_sstable_format.

Use chosen_sstable_format instead.
2026-03-09 17:12:09 +01:00

355 lines
13 KiB
Python

#
# Copyright (C) 2025-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
import logging
import re
import time
import cassandra
import pytest
import requests
from cassandra import ConsistencyLevel
from cassandra.auth import PlainTextAuthProvider
from cassandra.cluster import ExecutionProfile
from cassandra.policies import RetryPolicy
from test.cluster.dtest.tools.misc import retry_till_success
logger = logging.getLogger(__name__)
logger.debug(f"Python driver version in use: {cassandra.__version__}")
class FlakyRetryPolicy(RetryPolicy):
"""
A retry policy that retries 5 times
"""
max_retries: int = 5
def on_read_timeout(self, *args, **kwargs):
if kwargs["retry_num"] < 5:
logger.debug("Retrying read after timeout. Attempt #" + str(kwargs["retry_num"]))
return (self.RETRY, None)
else:
return (self.RETHROW, None)
def on_write_timeout(self, *args, **kwargs):
if kwargs["retry_num"] < 5:
logger.debug("Retrying write after timeout. Attempt #" + str(kwargs["retry_num"]))
return (self.RETRY, None)
else:
return (self.RETHROW, None)
def on_unavailable(self, *args, **kwargs):
if kwargs["retry_num"] < 5:
logger.debug("Retrying request after UE. Attempt #" + str(kwargs["retry_num"]))
return (self.RETRY, None)
else:
return (self.RETHROW, None)
def make_execution_profile(retry_policy=FlakyRetryPolicy(), consistency_level=ConsistencyLevel.ONE, **kwargs):
return ExecutionProfile(retry_policy=retry_policy, consistency_level=consistency_level, **kwargs)
class WaitTimeoutExpiredError(Exception):
pass
def forever_wait_for(func, step=1, text=None, **kwargs):
"""
Wait indefinitely until func evaluates to True.
This is similar to avocado.utils.wait.wait(), but there's no
timeout, we'll just keep waiting for it.
:param func: Function to evaluate.
:param step: Amount of time to sleep before another try.
:param text: Text to log, for debugging purposes.
:param kwargs: Keyword arguments to func
:return: Return value of func.
"""
ok = False
start_time = time.time()
while not ok:
ok = func(**kwargs)
if ok:
break
time_elapsed = time.time() - start_time
if text is not None:
logger.debug(f"{text} ({time_elapsed} s)")
time.sleep(step)
return ok
def wait_for(func, step=0.2, text=None, timeout=None, throw_exc=True, **kwargs):
"""
Wrapper function to wait with timeout option.
If no timeout received, 'forever_wait_for' method will be used.
Otherwise the below function will be called.
:param func: Function to evaluate.
:param step: Time to sleep between attempts in seconds
:param text: Text to print while waiting, for debug purposes
:param timeout: Timeout in seconds
:param throw_exc: Raise exception if timeout expired, but func result is not evaluated to True
:param kwargs: Keyword arguments to func
:return: Return value of func.
"""
if not timeout:
return forever_wait_for(func, step, text, **kwargs)
ok = False
start_time = time.time()
while not ok:
ok = func(**kwargs)
if ok:
break
time_elapsed = time.time() - start_time
if text is not None:
logger.debug(f"({text} ({time_elapsed} s)")
if time_elapsed > timeout:
err = f"Wait for: {text}: timeout - {timeout} seconds - expired"
logger.debug(err)
if throw_exc:
raise WaitTimeoutExpiredError(err)
else:
break
time.sleep(step)
return ok
class DtestTimeoutError(Exception):
pass
def create_ks(session, name: str, rf: int | dict[str, int], tablets: int | None = None, replication_class: str = "NetworkTopologyStrategy"):
"""
Create a keyspace with the given name
rf: determines the replication factor.
- It can be given as an integer to set the `replication_factor` replication option
- or as a dict[str, int], like {"dc1": 1, "dc2": 2}, to explicitly set the replication factor per datacenter
tablets: is an optional parameter to control the initial number of tablets.
- Tablets are enabled by default, when supported (and tablets is None).
- Set tablets = 0 to disable tablets for the created keyspace
- Set tablets != 0 to set the initial number of tablets.
"""
query = "CREATE KEYSPACE %s WITH replication={%s}"
if isinstance(rf, int):
# we assume simpleStrategy without tablets
query = query % (name, f"'class':'{replication_class}', 'replication_factor':{rf}")
else:
assert len(rf) >= 0, "At least one datacenter/rf pair is needed"
# we assume networkTopologyStrategy
options = ", ".join(["'%s':%d" % (dc_value, rf_value) for dc_value, rf_value in rf.items()])
query = query % (name, "'class':'NetworkTopologyStrategy', %s" % options)
if tablets is not None:
if tablets:
query += " and tablets={'initial':%d}" % tablets
else:
query += " and tablets={'enabled':false}"
return create_ks_query(session=session, name=name, query=query)
def create_ks_query(session, name, query):
logger.debug("%s" % query)
try:
retry_till_success(session.execute, query=query, timeout=120, bypassed_exception=cassandra.OperationTimedOut)
except cassandra.AlreadyExists:
logger.warning("AlreadyExists executing create ks query '%s'" % query)
session.cluster.control_connection.wait_for_schema_agreement(wait_time=120)
# Also validates it was indeed created even though we ignored OperationTimedOut
# Might happen some of the time because CircleCI disk IO is unreliable and hangs randomly
session.execute(f"USE {name}")
def read_barrier(session):
"""To issue a read barrier it is sufficient to attempt dropping a
non-existing table. We need to use `if exists`, otherwise the statement
would fail on prepare/validate step which happens before a read barrier is
performed.
"""
session.execute("drop table if exists nosuchkeyspace.nosuchtable")
def get_auth_provider(user, password):
return PlainTextAuthProvider(username=user, password=password)
def make_auth(user, password):
def private_auth(node_ip):
return {"username": user, "password": password}
return private_auth
def data_size(node, ks, cf):
"""
Return the size in bytes for given table in a node.
This gets the size from nodetool cfstats output.
This might brake if the format of nodetool cfstats change
as it is looking for specific text "Space used (total)" in output.
@param node: Node in which table size to be checked for
@param ks: Keyspace name for the table
@param cf: table name
@return: data size in bytes
"""
cfstats = node.nodetool(f"cfstats {ks}.{cf}")[0]
regex = re.compile(r"[\t]")
stats_lines = [regex.sub("", s) for s in cfstats.split("\n") if regex.sub("", s).startswith("Space used (total)")]
if not len(stats_lines) == 1:
msg = ('Expected output from `nodetool cfstats` to contain exactly 1 line starting with "Space used (total)". Found:\n') + cfstats
raise RuntimeError(msg)
space_used_line = stats_lines[0].split()
if len(space_used_line) == 4:
return float(space_used_line[3])
else:
msg = ("Expected format for `Space used (total)` in nodetool cfstats is `Space used (total): <number>`.Found:\n") + stats_lines[0]
raise RuntimeError(msg)
def get_port_from_node(node):
"""
Return the port that this node is listening on.
We only use this to connect the native driver,
so we only care about the binary port.
"""
try:
return node.network_interfaces["binary"][1]
except Exception: # noqa: BLE001
raise RuntimeError(f"No network interface defined on this node object. {node.network_interfaces}")
def get_ip_from_node(node):
if node.network_interfaces["binary"]:
node_ip = node.network_interfaces["binary"][0]
else:
node_ip = node.network_interfaces["thrift"][0]
return node_ip
def is_autocompaction_enabled(node, ks_name, table_name):
"""
Return if autocompaction is enabled or not
:param node: node to execute the API request
:param ks_name: Keyspace name to verify if autocompaction is enabled
:param table_name: table name to verify if autocompaction is enabled
:return: True|False
"""
node_ip = get_ip_from_node(node=node)
response = requests.get(f"http://{node_ip}:10000/column_family/autocompaction/{ks_name}:{table_name}")
response.raise_for_status()
return response.json()
def chosen_sstable_format(node):
node_ip = get_ip_from_node(node=node)
response = requests.get(f"http://{node_ip}:10000/system/chosen_sstable_version")
response.raise_for_status()
return response.json()
class Tester:
def __getattribute__(self, name):
try:
return object.__getattribute__(self, name)
except AttributeError:
fixture_dtest_setup = object.__getattribute__(self, "fixture_dtest_setup")
return object.__getattribute__(fixture_dtest_setup, name)
@pytest.fixture(scope="function", autouse=True)
def set_dtest_setup_on_function(self, fixture_dtest_setup):
self.fixture_dtest_setup = fixture_dtest_setup
self.dtest_config = fixture_dtest_setup.dtest_config
# We default to UTF8Type because it's simpler to use in tests
def create_cf( # noqa: PLR0912, PLR0913
session,
name,
key_type="varchar",
speculative_retry=None,
read_repair=None,
compression=None,
gc_grace=None,
columns=None,
validation="UTF8Type",
compact_storage=False,
compaction_strategy="SizeTieredCompactionStrategy",
primary_key=None,
clustering=None,
default_ttl=None,
compaction=None,
debug_query=False,
caching=True,
paxos_grace_seconds=None,
dclocal_read_repair_chance=None,
scylla_encryption_options=None,
key_name: str = "key",
):
compaction_fragment = None
if compaction_strategy:
compaction_fragment = "compaction = {'class': '%s', 'enabled': 'true'}" % compaction_strategy
additional_columns = ""
if columns is not None:
for k, v in list(columns.items()):
additional_columns = f"{additional_columns}, {k} {v}"
if additional_columns == "":
query = f"CREATE COLUMNFAMILY {name} ({key_name} {key_type}, c varchar, v varchar, PRIMARY KEY(key, c)) WITH comment='test cf'"
elif primary_key:
query = f"CREATE COLUMNFAMILY {name} ({key_name} {key_type}{additional_columns}, PRIMARY KEY({primary_key})) WITH comment='test cf'"
else:
query = f"CREATE COLUMNFAMILY {name} ({key_name} {key_type} PRIMARY KEY{additional_columns}) WITH comment='test cf'"
if compaction is not None:
query = f"{query} AND compaction={compaction}"
elif compaction_fragment is not None:
query = f"{query} AND {compaction_fragment}"
if clustering:
query = f"{query} AND CLUSTERING ORDER BY ({clustering})"
if compression is not None:
query = f"{query} AND compression = {{ 'sstable_compression': '{compression}Compressor' }}"
else:
# if a compression option is omitted, C* will default to lz4 compression
query += " AND compression = {}"
if read_repair is not None:
query = f"{query} AND read_repair_chance={read_repair:f}"
if dclocal_read_repair_chance is not None:
query = f"{query} AND dclocal_read_repair_chance='{dclocal_read_repair_chance}'"
if gc_grace is not None:
query = "%s AND gc_grace_seconds=%d" % (query, gc_grace)
if default_ttl is not None:
query = "%s AND default_time_to_live=%d" % (query, default_ttl)
if speculative_retry is not None:
query = f"{query} AND speculative_retry='{speculative_retry}'"
if compact_storage:
query += " AND COMPACT STORAGE"
if scylla_encryption_options is not None:
query = f"{query} AND scylla_encryption_options={scylla_encryption_options}"
if not caching:
query += " AND caching = {'enabled':false}"
if debug_query:
logger.debug(query)
if paxos_grace_seconds is not None:
query = "%s AND paxos_grace_seconds=%d" % (query, paxos_grace_seconds)
try:
retry_till_success(session.execute, query=query, timeout=120, bypassed_exception=cassandra.OperationTimedOut)
except cassandra.AlreadyExists:
logger.warning("AlreadyExists executing create cf query '%s'" % query)
session.cluster.control_connection.wait_for_schema_agreement(wait_time=120)
# Going to ignore OperationTimedOut from create CF, so need to validate it was indeed created
session.execute("SELECT * FROM %s LIMIT 1" % name)