Drop the AGPL license in favor of a source-available license. See the blog post [1] for details. [1] https://www.scylladb.com/2024/12/18/why-were-moving-to-a-source-available-license/
396 lines
16 KiB
Python
396 lines
16 KiB
Python
#
|
|
# Copyright (C) 2022-present ScyllaDB
|
|
#
|
|
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
#
|
|
"""This module provides helper classes to manage CQL tables, perform random schema changes,
|
|
and verify expected current schema.
|
|
|
|
Classes:
|
|
RandomTables
|
|
A list of managed tables stored in self.tables.
|
|
.add_tables() creates multiple (ntables) random tables with (ncolumns) random columns.
|
|
.add_table() create a table of specified number of random type columns or a custom table if
|
|
given list of columns.
|
|
Provides list access by position with [pos].
|
|
Custom tables can be .append()ed.
|
|
A list of tables can be merged with extend().
|
|
drop_table() either random one or a specified one by name.
|
|
drop_all_tables()
|
|
verify_schema() checks expected schema for all managed and active tables.
|
|
.removed_tables keeps previous tables after dropping them.
|
|
|
|
RandomTable
|
|
A managed table.
|
|
Column
|
|
Manage a table's column and generate a value from a seed.
|
|
Usually tests should generate deterministic sequential values.
|
|
"""
|
|
|
|
|
|
from __future__ import annotations
|
|
from abc import ABCMeta
|
|
import asyncio
|
|
import itertools
|
|
import logging
|
|
import random
|
|
import uuid
|
|
import time
|
|
from typing import Optional, Type, List, Set, Union, TYPE_CHECKING
|
|
if TYPE_CHECKING:
|
|
from cassandra.cluster import Session as CassandraSession # type: ignore
|
|
from test.pylib.manager_client import ManagerClient
|
|
from test.pylib.rest_client import get_host_api_address, read_barrier
|
|
from test.pylib.util import get_available_host
|
|
|
|
|
|
logger = logging.getLogger('random_tables')
|
|
|
|
|
|
class ColumnNotFound(Exception):
|
|
pass
|
|
|
|
|
|
class ValueType(metaclass=ABCMeta):
|
|
"""Base value type"""
|
|
name: str = ""
|
|
|
|
def val(self, seed: int):
|
|
"""Return next value for this type"""
|
|
pass
|
|
|
|
|
|
class IntType(ValueType):
|
|
def __init__(self):
|
|
self.name: str = 'int'
|
|
|
|
def val(self, seed: int) -> int:
|
|
return seed
|
|
|
|
|
|
class TextType(ValueType):
|
|
def __init__(self):
|
|
self.name: str = 'text'
|
|
|
|
def val(self, seed) -> str:
|
|
return str(seed)
|
|
|
|
|
|
class FloatType(ValueType):
|
|
def __init__(self):
|
|
self.name: str = 'float'
|
|
|
|
def val(self, seed: int) -> float:
|
|
return float(seed)
|
|
|
|
|
|
class UUIDType(ValueType):
|
|
def __init__(self):
|
|
self.name: str = 'uuid'
|
|
|
|
def val(self, seed: int) -> uuid.UUID:
|
|
return uuid.UUID(f"{{00000000-0000-0000-0000-{seed:012}}}")
|
|
|
|
|
|
class CounterType(ValueType):
|
|
def __init__(self):
|
|
self.name: str = 'counter'
|
|
|
|
def val(self, seed: int) -> int:
|
|
return seed
|
|
|
|
|
|
class Column():
|
|
"""A column definition.
|
|
If no value type specified it picks a random one.
|
|
There is no support for collection or user-defined types."""
|
|
def __init__(self, name: str, ctype: Optional[Type[ValueType]] = None):
|
|
self.name: str = name
|
|
if ctype is not None:
|
|
self.ctype = ctype()
|
|
else:
|
|
self.ctype = random.choice([IntType, TextType, FloatType, UUIDType])()
|
|
|
|
self.cql: str = f"{self.name} {self.ctype.name}"
|
|
|
|
def val(self, seed):
|
|
"""Generate a random value"""
|
|
return self.ctype.val(seed)
|
|
|
|
def __str__(self):
|
|
return self.name
|
|
|
|
|
|
class RandomTable():
|
|
"""A managed random table
|
|
"""
|
|
# Sequential unique id
|
|
newid = itertools.count(start=1).__next__
|
|
|
|
def __init__(self, manager: ManagerClient, keyspace: str, ncolumns: Optional[int]=None,
|
|
columns: Optional[List[Column]]=None, pks: int=2, name: str=None):
|
|
"""Set up a new table definition from column definitions.
|
|
If column definitions not specified pick a random number of columns with random types.
|
|
By default there will be 4 columns with first column as Primary Key"""
|
|
self.id: int = RandomTable.newid()
|
|
self.manager: ManagerClient = manager
|
|
self.keyspace: str = keyspace
|
|
self.name: str = name if name is not None else f"t_{self.id:02}"
|
|
self.full_name: str = keyspace + "." + self.name
|
|
self.next_clustering_id = itertools.count(start=1).__next__
|
|
self.next_value_id = itertools.count(start=1).__next__
|
|
# TODO: assumes primary key is composed of first self.pks columns
|
|
self.pks = pks
|
|
|
|
if columns is not None:
|
|
assert len(columns) > pks, "Not enough value columns provided"
|
|
self.columns = columns
|
|
else:
|
|
assert isinstance(ncolumns, int) and ncolumns > pks, "Not enough value columns provided"
|
|
# Primary key pk, clustering columns c_xx, value columns v_xx
|
|
self.columns = [Column("pk")]
|
|
self.columns += [Column(f"c_{self.next_clustering_id():02}", ctype=TextType)
|
|
for i in range(1, pks)]
|
|
self.columns += [Column(f"v_{self.next_value_id():02}")
|
|
for i in range(1, ncolumns - pks + 1)]
|
|
|
|
self.removed_columns: List[Column] = []
|
|
# Counter for sequential values to insert
|
|
self.next_seq = itertools.count(start=1).__next__
|
|
self.next_idx_id = itertools.count(start=1).__next__
|
|
self.indexes: Set[str] = set()
|
|
self.removed_indexes: Set[str] = set()
|
|
|
|
@property
|
|
def all_col_names(self) -> str:
|
|
"""Get all column names comma separated for CQL query generation convenience"""
|
|
return ", ".join([c.name for c in self.columns])
|
|
|
|
async def create(self, if_not_exists: bool = False) -> asyncio.Future:
|
|
"""Create this table"""
|
|
col_defs = ", ".join(f"{c.cql}" for c in self.columns)
|
|
pk_names = ", ".join(c.name for c in self.columns[:self.pks])
|
|
cql_stmt = f"CREATE TABLE {'IF NOT EXISTS ' if if_not_exists else ''} {self.full_name} "\
|
|
f"({col_defs}, , primary key({pk_names}))"
|
|
logger.debug(cql_stmt)
|
|
assert self.manager.cql is not None
|
|
return await self.manager.cql.run_async(cql_stmt)
|
|
|
|
async def drop(self, if_exists: bool = False) -> asyncio.Future:
|
|
"""Drop this table"""
|
|
cql_stmt = f"DROP TABLE {'IF EXISTS ' if if_exists else ''}{self.full_name}"
|
|
logger.debug(cql_stmt)
|
|
assert self.manager.cql is not None
|
|
return await self.manager.cql.run_async(cql_stmt)
|
|
|
|
async def add_column(self, name: str = None, ctype: Type[ValueType] = None, column: Column = None):
|
|
"""Add a value column to the table"""
|
|
if column is not None:
|
|
assert type(column) is Column, "Wrong column type to add_column"
|
|
else:
|
|
name = name if name is not None else f"v_{self.next_value_id():02}"
|
|
ctype = ctype if ctype is not None else TextType
|
|
column = Column(name, ctype=ctype)
|
|
self.columns.append(column)
|
|
assert self.manager.cql is not None
|
|
await self.manager.cql.run_async(f"ALTER TABLE {self.full_name} "
|
|
f"ADD {column.name} {column.ctype.name}")
|
|
|
|
async def drop_column(self, column: Union[Column, str] = None):
|
|
if column is None:
|
|
col = random.choice(self.columns[self.pks:])
|
|
elif type(column) is int:
|
|
assert column >= self.pks, f"Cannot remove {self.name} PK column at pos {column}"
|
|
col = self.columns[column]
|
|
elif type(column) is str:
|
|
try:
|
|
col = next(col for col in self.columns if col.name == column)
|
|
except StopIteration:
|
|
raise ColumnNotFound(f"Column {column} not found in table {self.name}")
|
|
else:
|
|
assert type(column) is Column, f"can not remove unknown type {type(column)}"
|
|
assert column in self.columns, f"column {column.name} not present"
|
|
col = column
|
|
assert len(self.columns) - 1 > self.pks, f"Cannot remove last value column {col.name} from {self.name}"
|
|
self.columns.remove(col)
|
|
self.removed_columns.append(col)
|
|
assert self.manager.cql is not None
|
|
await self.manager.cql.run_async(f"ALTER TABLE {self.full_name} DROP {col.name}")
|
|
|
|
async def insert_seq(self) -> asyncio.Future:
|
|
"""Insert a row of next sequential values"""
|
|
seed = self.next_seq()
|
|
assert self.manager.cql is not None
|
|
return await self.manager.cql.run_async(f"INSERT INTO {self.full_name} ({self.all_col_names})"
|
|
f"VALUES ({', '.join(['%s'] * len(self.columns)) })",
|
|
parameters=[c.val(seed) for c in self.columns])
|
|
|
|
async def add_index(self, column: Union[Column, str], name: str = None) -> str:
|
|
if isinstance(column, int):
|
|
assert column > 0, f"Cannot create secondary index " \
|
|
f"on partition key column {self.columns[0].name}"
|
|
col_name = self.columns[column].name
|
|
elif isinstance(column, str):
|
|
col_name = column
|
|
elif isinstance(column, Column):
|
|
assert column in self.columns
|
|
col_name = column.name
|
|
else:
|
|
raise TypeError(f"Wrong column type {type(column)} given to add_column")
|
|
|
|
name = name if name is not None else f"{self.name}_{col_name}_{self.next_idx_id():02}"
|
|
assert self.manager.cql is not None
|
|
await self.manager.cql.run_async(f"CREATE INDEX {name} on {self.full_name} ({col_name})")
|
|
self.indexes.add(name)
|
|
return name
|
|
|
|
async def drop_index(self, name: str) -> None:
|
|
self.indexes.remove(name)
|
|
assert self.manager.cql is not None
|
|
await self.manager.cql.run_async(f"DROP INDEX {self.keyspace}.{name}")
|
|
self.removed_indexes.add(name)
|
|
|
|
async def enable_cdc(self) -> None:
|
|
assert self.manager.cql is not None
|
|
await self.manager.cql.run_async(f"ALTER TABLE {self.full_name} WITH cdc = {{ 'enabled' : true }}")
|
|
|
|
async def disable_cdc(self) -> None:
|
|
assert self.manager.cql is not None
|
|
await self.manager.cql.run_async(f"ALTER TABLE {self.full_name} WITH cdc = {{ 'enabled' : false }}")
|
|
|
|
def __str__(self):
|
|
return self.full_name
|
|
|
|
|
|
class RandomTables():
|
|
"""A list of managed random tables"""
|
|
|
|
def __init__(self, test_name: str, manager: ManagerClient, keyspace: str,
|
|
replication_factor: int,
|
|
dc_replication_factor: dict[str, int] = None,
|
|
enable_tablets: None | bool = None):
|
|
keyspace_query = f"CREATE KEYSPACE IF NOT EXISTS {keyspace} WITH REPLICATION = {{ 'class' : 'NetworkTopologyStrategy', 'replication_factor' : {replication_factor}}}"
|
|
if enable_tablets is not None:
|
|
enable_tablets = str(enable_tablets).lower()
|
|
keyspace_query += f" AND TABLETS = {{'enabled': {enable_tablets} }}"
|
|
self.test_name = test_name
|
|
self.manager = manager
|
|
self.keyspace = keyspace
|
|
self.tables: List[RandomTable] = []
|
|
self.removed_tables: List[RandomTable] = []
|
|
assert self.manager.cql is not None
|
|
if dc_replication_factor is not None:
|
|
for key, val in dc_replication_factor.items():
|
|
keyspace_query = keyspace_query[:-1] + f",'{key}': {val}}}"
|
|
self.manager.cql.execute(keyspace_query)
|
|
|
|
async def add_tables(self, ntables: int = 1, ncolumns: int = 5, if_not_exists: bool = False) -> None:
|
|
"""Add random tables to the list.
|
|
ntables specifies how many tables.
|
|
ncolumns specifies how many random columns per table."""
|
|
tables = [RandomTable(self.manager, self.keyspace, ncolumns) for _ in range(ntables)]
|
|
await asyncio.gather(*(t.create(if_not_exists) for t in tables))
|
|
self.tables.extend(tables)
|
|
|
|
async def add_table(self, ncolumns: int = None, columns: List[Column] = None,
|
|
pks: int = 2, name: str = None,
|
|
if_not_exists: bool = False) -> RandomTable:
|
|
"""Add a random table. See random_tables.RandomTable()"""
|
|
table = RandomTable(self.manager, self.keyspace, ncolumns=ncolumns, columns=columns,
|
|
pks=pks, name=name)
|
|
await table.create(if_not_exists)
|
|
self.tables.append(table)
|
|
return table
|
|
|
|
async def add_udt(self, name: str, cmd: str) -> None:
|
|
await self.manager.cql.run_async(f"CREATE TYPE {self.keyspace}.{name} {cmd}")
|
|
|
|
async def drop_udt(self, name) -> None:
|
|
await self.manager.cql.run_async(f"DROP TYPE {self.keyspace}.{name}")
|
|
|
|
def __getitem__(self, pos: int) -> RandomTable:
|
|
return self.tables[pos]
|
|
|
|
def append(self, table: RandomTable) -> None:
|
|
self.tables.append(table)
|
|
|
|
def extend(self, tables: List[RandomTable]) -> None:
|
|
self.tables.extend(tables)
|
|
|
|
async def drop_table(self, table: Union[str, RandomTable], if_exists: bool = False) -> RandomTable:
|
|
"""Drop managed RandomTable by name or by RandomTable instance"""
|
|
if isinstance(table, str):
|
|
table = next(t for t in self.tables if table in [t.name, t.full_name])
|
|
else:
|
|
assert isinstance(table, RandomTable), f"Invalid table type {type(table)}"
|
|
await table.drop(if_exists)
|
|
self.tables.remove(table)
|
|
self.removed_tables.append(table)
|
|
return table
|
|
|
|
def drop_all(self) -> None:
|
|
"""Drop keyspace (and tables)"""
|
|
assert self.manager.cql is not None
|
|
self.manager.cql.execute(f"DROP KEYSPACE {self.keyspace}")
|
|
|
|
async def verify_schema(self, table: Union[RandomTable, str] = None, do_read_barrier: bool = True) -> None:
|
|
"""Verify schema of all active managed random tables"""
|
|
if isinstance(table, RandomTable):
|
|
tables = {table.name}
|
|
cql_stmt1 = f"SELECT table_name FROM system_schema.tables " \
|
|
f"WHERE keyspace_name = '{self.keyspace}' AND table_name = '{table.name}'"
|
|
elif isinstance(table, str):
|
|
if table.startswith(f"{self.keyspace}."):
|
|
table = table[len(self.keyspace) + 1:]
|
|
tables = {table}
|
|
cql_stmt1 = f"SELECT table_name FROM system_schema.tables " \
|
|
f"WHERE keyspace_name = '{self.keyspace}' AND table_name = '{table}'"
|
|
else:
|
|
tables = set(t.name for t in self.tables)
|
|
cql_stmt1 = f"SELECT table_name FROM system_schema.tables " \
|
|
f"WHERE keyspace_name = '{self.keyspace}'"
|
|
|
|
logger.debug(cql_stmt1)
|
|
|
|
cql = self.manager.cql
|
|
assert cql
|
|
|
|
host = await get_available_host(cql, time.time() + 60)
|
|
if do_read_barrier:
|
|
# Issue a read barrier on some node and then keep using that node to do the queries.
|
|
# This ensures that the queries return recent data (at least all data committed
|
|
# when `verify_schema` was called).
|
|
await read_barrier(self.manager.api, get_host_api_address(host))
|
|
|
|
res1 = {row.table_name for row in await cql.run_async(cql_stmt1, host=host)}
|
|
assert not tables - res1, f"Tables {tables - res1} not present"
|
|
|
|
for table_name in tables:
|
|
table = next(t for t in self.tables if t.name == table_name)
|
|
cols = {c.name: c for c in table.columns}
|
|
c_pos = {c.name: i for i, c in enumerate(table.columns)}
|
|
cql_stmt2 = f"SELECT column_name, position, kind, type FROM system_schema.columns " \
|
|
f"WHERE keyspace_name = '{self.keyspace}' AND table_name = '{table_name}'"
|
|
logger.debug(cql_stmt2)
|
|
res2 = {row.column_name: row for row in await cql.run_async(cql_stmt2, host=host)}
|
|
assert res2.keys() == cols.keys(), f"Column names for {table_name} do not match " \
|
|
f"expected ({', '.join(cols.keys())}) " \
|
|
f"got ({', '.join(res2.keys())})"
|
|
for c_name, c in res2.items():
|
|
pos = c_pos[c_name]
|
|
col = cols[c_name]
|
|
assert c.type == col.ctype.name, f"Column {c_name} type does not match " \
|
|
f"{c.type} {col.ctype.name}"
|
|
if pos == 0:
|
|
kind = "partition_key"
|
|
schema_pos = 0
|
|
elif pos < table.pks:
|
|
kind = "clustering"
|
|
schema_pos = 0
|
|
else:
|
|
kind = "regular"
|
|
schema_pos = -1
|
|
assert c.kind == kind, f"Column {c_name} kind does not match {c.kind} {kind}"
|
|
assert c.position == schema_pos, f"Column {c_name} position {c.position} " \
|
|
f"does not match {schema_pos}"
|