Files
scylladb/test/pylib/random_tables.py
Avi Kivity f3eade2f62 treewide: relicense to ScyllaDB-Source-Available-1.0
Drop the AGPL license in favor of a source-available license.
See the blog post [1] for details.

[1] https://www.scylladb.com/2024/12/18/why-were-moving-to-a-source-available-license/
2024-12-18 17:45:13 +02:00

396 lines
16 KiB
Python

#
# Copyright (C) 2022-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
"""This module provides helper classes to manage CQL tables, perform random schema changes,
and verify expected current schema.
Classes:
RandomTables
A list of managed tables stored in self.tables.
.add_tables() creates multiple (ntables) random tables with (ncolumns) random columns.
.add_table() create a table of specified number of random type columns or a custom table if
given list of columns.
Provides list access by position with [pos].
Custom tables can be .append()ed.
A list of tables can be merged with extend().
drop_table() either random one or a specified one by name.
drop_all_tables()
verify_schema() checks expected schema for all managed and active tables.
.removed_tables keeps previous tables after dropping them.
RandomTable
A managed table.
Column
Manage a table's column and generate a value from a seed.
Usually tests should generate deterministic sequential values.
"""
from __future__ import annotations
from abc import ABCMeta
import asyncio
import itertools
import logging
import random
import uuid
import time
from typing import Optional, Type, List, Set, Union, TYPE_CHECKING
if TYPE_CHECKING:
from cassandra.cluster import Session as CassandraSession # type: ignore
from test.pylib.manager_client import ManagerClient
from test.pylib.rest_client import get_host_api_address, read_barrier
from test.pylib.util import get_available_host
logger = logging.getLogger('random_tables')
class ColumnNotFound(Exception):
pass
class ValueType(metaclass=ABCMeta):
"""Base value type"""
name: str = ""
def val(self, seed: int):
"""Return next value for this type"""
pass
class IntType(ValueType):
def __init__(self):
self.name: str = 'int'
def val(self, seed: int) -> int:
return seed
class TextType(ValueType):
def __init__(self):
self.name: str = 'text'
def val(self, seed) -> str:
return str(seed)
class FloatType(ValueType):
def __init__(self):
self.name: str = 'float'
def val(self, seed: int) -> float:
return float(seed)
class UUIDType(ValueType):
def __init__(self):
self.name: str = 'uuid'
def val(self, seed: int) -> uuid.UUID:
return uuid.UUID(f"{{00000000-0000-0000-0000-{seed:012}}}")
class CounterType(ValueType):
def __init__(self):
self.name: str = 'counter'
def val(self, seed: int) -> int:
return seed
class Column():
"""A column definition.
If no value type specified it picks a random one.
There is no support for collection or user-defined types."""
def __init__(self, name: str, ctype: Optional[Type[ValueType]] = None):
self.name: str = name
if ctype is not None:
self.ctype = ctype()
else:
self.ctype = random.choice([IntType, TextType, FloatType, UUIDType])()
self.cql: str = f"{self.name} {self.ctype.name}"
def val(self, seed):
"""Generate a random value"""
return self.ctype.val(seed)
def __str__(self):
return self.name
class RandomTable():
"""A managed random table
"""
# Sequential unique id
newid = itertools.count(start=1).__next__
def __init__(self, manager: ManagerClient, keyspace: str, ncolumns: Optional[int]=None,
columns: Optional[List[Column]]=None, pks: int=2, name: str=None):
"""Set up a new table definition from column definitions.
If column definitions not specified pick a random number of columns with random types.
By default there will be 4 columns with first column as Primary Key"""
self.id: int = RandomTable.newid()
self.manager: ManagerClient = manager
self.keyspace: str = keyspace
self.name: str = name if name is not None else f"t_{self.id:02}"
self.full_name: str = keyspace + "." + self.name
self.next_clustering_id = itertools.count(start=1).__next__
self.next_value_id = itertools.count(start=1).__next__
# TODO: assumes primary key is composed of first self.pks columns
self.pks = pks
if columns is not None:
assert len(columns) > pks, "Not enough value columns provided"
self.columns = columns
else:
assert isinstance(ncolumns, int) and ncolumns > pks, "Not enough value columns provided"
# Primary key pk, clustering columns c_xx, value columns v_xx
self.columns = [Column("pk")]
self.columns += [Column(f"c_{self.next_clustering_id():02}", ctype=TextType)
for i in range(1, pks)]
self.columns += [Column(f"v_{self.next_value_id():02}")
for i in range(1, ncolumns - pks + 1)]
self.removed_columns: List[Column] = []
# Counter for sequential values to insert
self.next_seq = itertools.count(start=1).__next__
self.next_idx_id = itertools.count(start=1).__next__
self.indexes: Set[str] = set()
self.removed_indexes: Set[str] = set()
@property
def all_col_names(self) -> str:
"""Get all column names comma separated for CQL query generation convenience"""
return ", ".join([c.name for c in self.columns])
async def create(self, if_not_exists: bool = False) -> asyncio.Future:
"""Create this table"""
col_defs = ", ".join(f"{c.cql}" for c in self.columns)
pk_names = ", ".join(c.name for c in self.columns[:self.pks])
cql_stmt = f"CREATE TABLE {'IF NOT EXISTS ' if if_not_exists else ''} {self.full_name} "\
f"({col_defs}, , primary key({pk_names}))"
logger.debug(cql_stmt)
assert self.manager.cql is not None
return await self.manager.cql.run_async(cql_stmt)
async def drop(self, if_exists: bool = False) -> asyncio.Future:
"""Drop this table"""
cql_stmt = f"DROP TABLE {'IF EXISTS ' if if_exists else ''}{self.full_name}"
logger.debug(cql_stmt)
assert self.manager.cql is not None
return await self.manager.cql.run_async(cql_stmt)
async def add_column(self, name: str = None, ctype: Type[ValueType] = None, column: Column = None):
"""Add a value column to the table"""
if column is not None:
assert type(column) is Column, "Wrong column type to add_column"
else:
name = name if name is not None else f"v_{self.next_value_id():02}"
ctype = ctype if ctype is not None else TextType
column = Column(name, ctype=ctype)
self.columns.append(column)
assert self.manager.cql is not None
await self.manager.cql.run_async(f"ALTER TABLE {self.full_name} "
f"ADD {column.name} {column.ctype.name}")
async def drop_column(self, column: Union[Column, str] = None):
if column is None:
col = random.choice(self.columns[self.pks:])
elif type(column) is int:
assert column >= self.pks, f"Cannot remove {self.name} PK column at pos {column}"
col = self.columns[column]
elif type(column) is str:
try:
col = next(col for col in self.columns if col.name == column)
except StopIteration:
raise ColumnNotFound(f"Column {column} not found in table {self.name}")
else:
assert type(column) is Column, f"can not remove unknown type {type(column)}"
assert column in self.columns, f"column {column.name} not present"
col = column
assert len(self.columns) - 1 > self.pks, f"Cannot remove last value column {col.name} from {self.name}"
self.columns.remove(col)
self.removed_columns.append(col)
assert self.manager.cql is not None
await self.manager.cql.run_async(f"ALTER TABLE {self.full_name} DROP {col.name}")
async def insert_seq(self) -> asyncio.Future:
"""Insert a row of next sequential values"""
seed = self.next_seq()
assert self.manager.cql is not None
return await self.manager.cql.run_async(f"INSERT INTO {self.full_name} ({self.all_col_names})"
f"VALUES ({', '.join(['%s'] * len(self.columns)) })",
parameters=[c.val(seed) for c in self.columns])
async def add_index(self, column: Union[Column, str], name: str = None) -> str:
if isinstance(column, int):
assert column > 0, f"Cannot create secondary index " \
f"on partition key column {self.columns[0].name}"
col_name = self.columns[column].name
elif isinstance(column, str):
col_name = column
elif isinstance(column, Column):
assert column in self.columns
col_name = column.name
else:
raise TypeError(f"Wrong column type {type(column)} given to add_column")
name = name if name is not None else f"{self.name}_{col_name}_{self.next_idx_id():02}"
assert self.manager.cql is not None
await self.manager.cql.run_async(f"CREATE INDEX {name} on {self.full_name} ({col_name})")
self.indexes.add(name)
return name
async def drop_index(self, name: str) -> None:
self.indexes.remove(name)
assert self.manager.cql is not None
await self.manager.cql.run_async(f"DROP INDEX {self.keyspace}.{name}")
self.removed_indexes.add(name)
async def enable_cdc(self) -> None:
assert self.manager.cql is not None
await self.manager.cql.run_async(f"ALTER TABLE {self.full_name} WITH cdc = {{ 'enabled' : true }}")
async def disable_cdc(self) -> None:
assert self.manager.cql is not None
await self.manager.cql.run_async(f"ALTER TABLE {self.full_name} WITH cdc = {{ 'enabled' : false }}")
def __str__(self):
return self.full_name
class RandomTables():
"""A list of managed random tables"""
def __init__(self, test_name: str, manager: ManagerClient, keyspace: str,
replication_factor: int,
dc_replication_factor: dict[str, int] = None,
enable_tablets: None | bool = None):
keyspace_query = f"CREATE KEYSPACE IF NOT EXISTS {keyspace} WITH REPLICATION = {{ 'class' : 'NetworkTopologyStrategy', 'replication_factor' : {replication_factor}}}"
if enable_tablets is not None:
enable_tablets = str(enable_tablets).lower()
keyspace_query += f" AND TABLETS = {{'enabled': {enable_tablets} }}"
self.test_name = test_name
self.manager = manager
self.keyspace = keyspace
self.tables: List[RandomTable] = []
self.removed_tables: List[RandomTable] = []
assert self.manager.cql is not None
if dc_replication_factor is not None:
for key, val in dc_replication_factor.items():
keyspace_query = keyspace_query[:-1] + f",'{key}': {val}}}"
self.manager.cql.execute(keyspace_query)
async def add_tables(self, ntables: int = 1, ncolumns: int = 5, if_not_exists: bool = False) -> None:
"""Add random tables to the list.
ntables specifies how many tables.
ncolumns specifies how many random columns per table."""
tables = [RandomTable(self.manager, self.keyspace, ncolumns) for _ in range(ntables)]
await asyncio.gather(*(t.create(if_not_exists) for t in tables))
self.tables.extend(tables)
async def add_table(self, ncolumns: int = None, columns: List[Column] = None,
pks: int = 2, name: str = None,
if_not_exists: bool = False) -> RandomTable:
"""Add a random table. See random_tables.RandomTable()"""
table = RandomTable(self.manager, self.keyspace, ncolumns=ncolumns, columns=columns,
pks=pks, name=name)
await table.create(if_not_exists)
self.tables.append(table)
return table
async def add_udt(self, name: str, cmd: str) -> None:
await self.manager.cql.run_async(f"CREATE TYPE {self.keyspace}.{name} {cmd}")
async def drop_udt(self, name) -> None:
await self.manager.cql.run_async(f"DROP TYPE {self.keyspace}.{name}")
def __getitem__(self, pos: int) -> RandomTable:
return self.tables[pos]
def append(self, table: RandomTable) -> None:
self.tables.append(table)
def extend(self, tables: List[RandomTable]) -> None:
self.tables.extend(tables)
async def drop_table(self, table: Union[str, RandomTable], if_exists: bool = False) -> RandomTable:
"""Drop managed RandomTable by name or by RandomTable instance"""
if isinstance(table, str):
table = next(t for t in self.tables if table in [t.name, t.full_name])
else:
assert isinstance(table, RandomTable), f"Invalid table type {type(table)}"
await table.drop(if_exists)
self.tables.remove(table)
self.removed_tables.append(table)
return table
def drop_all(self) -> None:
"""Drop keyspace (and tables)"""
assert self.manager.cql is not None
self.manager.cql.execute(f"DROP KEYSPACE {self.keyspace}")
async def verify_schema(self, table: Union[RandomTable, str] = None, do_read_barrier: bool = True) -> None:
"""Verify schema of all active managed random tables"""
if isinstance(table, RandomTable):
tables = {table.name}
cql_stmt1 = f"SELECT table_name FROM system_schema.tables " \
f"WHERE keyspace_name = '{self.keyspace}' AND table_name = '{table.name}'"
elif isinstance(table, str):
if table.startswith(f"{self.keyspace}."):
table = table[len(self.keyspace) + 1:]
tables = {table}
cql_stmt1 = f"SELECT table_name FROM system_schema.tables " \
f"WHERE keyspace_name = '{self.keyspace}' AND table_name = '{table}'"
else:
tables = set(t.name for t in self.tables)
cql_stmt1 = f"SELECT table_name FROM system_schema.tables " \
f"WHERE keyspace_name = '{self.keyspace}'"
logger.debug(cql_stmt1)
cql = self.manager.cql
assert cql
host = await get_available_host(cql, time.time() + 60)
if do_read_barrier:
# Issue a read barrier on some node and then keep using that node to do the queries.
# This ensures that the queries return recent data (at least all data committed
# when `verify_schema` was called).
await read_barrier(self.manager.api, get_host_api_address(host))
res1 = {row.table_name for row in await cql.run_async(cql_stmt1, host=host)}
assert not tables - res1, f"Tables {tables - res1} not present"
for table_name in tables:
table = next(t for t in self.tables if t.name == table_name)
cols = {c.name: c for c in table.columns}
c_pos = {c.name: i for i, c in enumerate(table.columns)}
cql_stmt2 = f"SELECT column_name, position, kind, type FROM system_schema.columns " \
f"WHERE keyspace_name = '{self.keyspace}' AND table_name = '{table_name}'"
logger.debug(cql_stmt2)
res2 = {row.column_name: row for row in await cql.run_async(cql_stmt2, host=host)}
assert res2.keys() == cols.keys(), f"Column names for {table_name} do not match " \
f"expected ({', '.join(cols.keys())}) " \
f"got ({', '.join(res2.keys())})"
for c_name, c in res2.items():
pos = c_pos[c_name]
col = cols[c_name]
assert c.type == col.ctype.name, f"Column {c_name} type does not match " \
f"{c.type} {col.ctype.name}"
if pos == 0:
kind = "partition_key"
schema_pos = 0
elif pos < table.pks:
kind = "clustering"
schema_pos = 0
else:
kind = "regular"
schema_pos = -1
assert c.kind == kind, f"Column {c_name} kind does not match {c.kind} {kind}"
assert c.position == schema_pos, f"Column {c_name} position {c.position} " \
f"does not match {schema_pos}"