Migrate several functionalities from dtest. These will be used by the schema_management_test.py tests when they are enabled. Refs #26932
146 lines
5.8 KiB
Python
146 lines
5.8 KiB
Python
#
|
|
# Copyright (C) 2025-present ScyllaDB
|
|
#
|
|
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
#
|
|
|
|
import datetime
|
|
import logging
|
|
from concurrent.futures.thread import ThreadPoolExecutor
|
|
from itertools import groupby
|
|
|
|
from cassandra import ConsistencyLevel
|
|
from cassandra.query import SimpleStatement
|
|
from cassandra.concurrent import execute_concurrent_with_args
|
|
|
|
from test.cluster.dtest.dtest_class import create_cf
|
|
from test.cluster.dtest.tools import assertions
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def create_c1c2_table(session, cf="cf", read_repair=None, debug_query=True, compaction=None, caching=True, speculative_retry=None): # noqa: PLR0913
|
|
create_cf(session, cf, columns={"c1": "text", "c2": "text"}, read_repair=read_repair, debug_query=debug_query, compaction=compaction, caching=caching, speculative_retry=speculative_retry)
|
|
|
|
|
|
def insert_c1c2( # noqa: PLR0913
|
|
session,
|
|
keys=None,
|
|
n=None,
|
|
consistency=ConsistencyLevel.QUORUM,
|
|
c1_values=None,
|
|
c2_values=None,
|
|
ks="ks",
|
|
cf="cf",
|
|
concurrency=20,
|
|
):
|
|
if (keys is None and n is None) or (keys is not None and n is not None):
|
|
raise ValueError(f"Expected exactly one of 'keys' or 'n' arguments to not be None; got keys={keys}, n={n}")
|
|
if (not c1_values and c2_values) or (c1_values and not c2_values):
|
|
raise ValueError('Expected the "c1_values" and "c2_values" variables be empty or contain list of string')
|
|
if n:
|
|
keys = list(range(n))
|
|
if c1_values and c2_values:
|
|
statement = session.prepare(f"INSERT INTO {ks}.{cf} (key, c1, c2) VALUES (?, ?, ?)")
|
|
statement.consistency_level = consistency
|
|
execute_concurrent_with_args(session, statement, map(lambda x, y, z: [f"k{x}", y, z], keys, c1_values, c2_values), concurrency=concurrency)
|
|
else:
|
|
statement = session.prepare(f"INSERT INTO {ks}.{cf} (key, c1, c2) VALUES (?, 'value1', 'value2')")
|
|
statement.consistency_level = consistency
|
|
|
|
execute_concurrent_with_args(session, statement, [[f"k{k}"] for k in keys], concurrency=concurrency)
|
|
|
|
|
|
def query_c1c2( # noqa: PLR0913
|
|
session,
|
|
key,
|
|
consistency=ConsistencyLevel.QUORUM,
|
|
tolerate_missing=False,
|
|
must_be_missing=False,
|
|
c1_value="value1",
|
|
c2_value="value2",
|
|
ks="ks",
|
|
cf="cf",
|
|
):
|
|
query = SimpleStatement(f"SELECT c1, c2 FROM {ks}.{cf} WHERE key='k{key}'", consistency_level=consistency)
|
|
rows = list(session.execute(query))
|
|
if not tolerate_missing and not must_be_missing:
|
|
assertions.assert_length_equal(rows, 1)
|
|
res = rows[0]
|
|
assert len(res) == 2 and res[0] == c1_value and res[1] == c2_value, res
|
|
if must_be_missing:
|
|
assertions.assert_length_equal(rows, 0)
|
|
|
|
|
|
def rows_to_list(rows):
|
|
new_list = [list(row) for row in rows]
|
|
return new_list
|
|
|
|
|
|
def get_list_res(session, query, cl, ignore_order=False, result_as_string=False, timeout=None):
|
|
simple_query = SimpleStatement(query, consistency_level=cl)
|
|
if timeout is not None:
|
|
res = session.execute(simple_query, timeout=timeout)
|
|
else:
|
|
res = session.execute(simple_query)
|
|
list_res = rows_to_list(res)
|
|
if ignore_order:
|
|
list_res = sorted(list_res)
|
|
if result_as_string:
|
|
list_res = str(list_res)
|
|
return list_res
|
|
|
|
|
|
def run_in_parallel(functions_list):
|
|
"""
|
|
Runs the functions that are passed in proc_functions in parallel using threads.
|
|
:param functions_list: variable holds list of dictionaries with threads definitions. Expected structure:
|
|
[{'func': <function pointer - the function will be runs from the thread>,
|
|
'args': (arg1, arg2, arg3), - explicit function arguments by order in the function
|
|
'kwargs': {<arg name1>: value, <arg name2>: value} - function arguments by name
|
|
}, - first thread definition
|
|
{{'func': <function pointer, 'args': (), 'kwargs': {}} - second thread, no arguments
|
|
]
|
|
:param functions_list: list
|
|
:return: list of functions' return values
|
|
:rtype: list
|
|
"""
|
|
logger.debug(f"Threads start at {datetime.datetime.now()}")
|
|
pool = ThreadPoolExecutor(max_workers=len(functions_list))
|
|
tasks = []
|
|
for func in functions_list:
|
|
args = func["args"] if "args" in func else []
|
|
kwargs = func["kwargs"] if "kwargs" in func else {}
|
|
tasks.append(pool.submit(func["func"], *args, **kwargs))
|
|
results = [task.result() for task in tasks]
|
|
logger.debug(f"'{len(results)}' threads finished at {datetime.datetime.now()}")
|
|
return results
|
|
|
|
|
|
def run_query_with_data_processing(
|
|
session,
|
|
query,
|
|
consistency_level=ConsistencyLevel.ONE,
|
|
session_timeout=None,
|
|
group=False,
|
|
groupby_column=None,
|
|
restrict_column=None,
|
|
restrict_value=None,
|
|
):
|
|
if not session_timeout:
|
|
session_timeout = 120
|
|
result = list(session.execute(SimpleStatement(query, consistency_level=consistency_level), timeout=session_timeout))
|
|
if result:
|
|
if restrict_column:
|
|
restrict_column_index = next(i for i, clmn in enumerate(result[0]._fields) if clmn == restrict_column)
|
|
restrict_value = [restrict_value] if not isinstance(restrict_value, list) else restrict_value
|
|
|
|
if group:
|
|
groupby_column_index = next(i for i, clmn in enumerate(result[0]._fields) if clmn == groupby_column)
|
|
result = [item[groupby_column_index] for item in result if item[restrict_column_index] in restrict_value] if restrict_value and restrict_column else [item[groupby_column_index] for item in result]
|
|
result = [[key, len(list(group))] for key, group in groupby(sorted(result))]
|
|
elif restrict_value and restrict_column:
|
|
result = [item for item in result if item[restrict_column_index] in restrict_value]
|
|
return result
|