Files
scylladb/test/cluster/test_random_tables.py
Benny Halevy a4aa4d74c1 test/pylib: servers_add: add auto_rack_dc parameter
To quickly populate nodes in a single dc,
each node in its own rack.

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
2025-03-30 19:23:40 +03:00

126 lines
4.7 KiB
Python

#
# Copyright (C) 2022-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
import asyncio
import time
import pytest
from cassandra.protocol import InvalidRequest, ReadFailure # type: ignore
from cassandra.query import SimpleStatement # type: ignore
from test.cluster.util import wait_for_token_ring_and_group0_consistency
pytestmark = pytest.mark.prepare_3_racks_cluster
# Simple test of schema helper
@pytest.mark.asyncio
async def test_new_table(manager, random_tables):
cql = manager.cql
assert cql is not None
table = await random_tables.add_table(ncolumns=5)
# Before performing data queries, make sure that the token ring
# has converged (we're using ring_delay = 0).
# Otherwise the queries may pick wrong replicas.
await wait_for_token_ring_and_group0_consistency(manager, time.time() + 60)
await cql.run_async(f"INSERT INTO {table} ({','.join(c.name for c in table.columns)})" \
f"VALUES ({', '.join(['%s'] * len(table.columns))})",
parameters=[c.val(1) for c in table.columns])
pk_col = table.columns[0]
ck_col = table.columns[1]
vals = [pk_col.val(1), ck_col.val(1)]
res = await cql.run_async(f"SELECT * FROM {table} WHERE {pk_col}=%s AND {ck_col}=%s",
parameters=vals)
assert len(res) == 1
assert list(res[0])[:2] == vals
await random_tables.drop_table(table)
# NOTE: On rare occasions the exception is ReadFailure
with pytest.raises((InvalidRequest, ReadFailure),
match='(unconfigured table|failed to execute read)'):
await cql.run_async(f"SELECT * FROM {table}")
await random_tables.verify_schema()
# Simple test of schema helper with alter
@pytest.mark.asyncio
async def test_alter_verify_schema(manager, random_tables):
"""Verify table schema"""
cql = manager.cql
assert cql is not None
await random_tables.add_tables(ntables=4, ncolumns=5)
await random_tables.verify_schema()
# Manually remove a column
table = random_tables[0]
await cql.run_async(f"ALTER TABLE {table} DROP {table.columns[-1].name}")
with pytest.raises(AssertionError, match='Column'):
await random_tables.verify_schema()
@pytest.mark.asyncio
async def test_new_table_insert_one(manager, random_tables):
cql = manager.cql
assert cql is not None
table = await random_tables.add_table(ncolumns=5)
await table.insert_seq()
pk_col = table.columns[0]
ck_col = table.columns[1]
vals = [pk_col.val(1), ck_col.val(1)]
res = await cql.run_async(f"SELECT * FROM {table} WHERE pk=%s AND {ck_col}=%s",
parameters=vals)
assert len(res) == 1
assert list(res[0])[:2] == vals
@pytest.mark.asyncio
async def test_drop_column(manager, random_tables):
"""Drop a random column from a table"""
cql = manager.cql
assert cql is not None
table = await random_tables.add_table(ncolumns=5)
await table.insert_seq()
await table.drop_column()
res = (await cql.run_async(f"SELECT * FROM {table} WHERE pk=%s",
parameters=[table.columns[0].val(1)]))[0]
assert len(res) == 4
await table.drop_column()
res = (await cql.run_async(f"SELECT * FROM {table} WHERE pk=%s",
parameters=[table.columns[0].val(1)]))[0]
assert len(res) == 3
await random_tables.verify_schema(table)
@pytest.mark.asyncio
async def test_add_index(random_tables):
"""Add and drop an index"""
table = await random_tables.add_table(ncolumns=5)
with pytest.raises(AssertionError, match='partition key'):
await table.add_index(0)
await table.add_index(2)
await random_tables.verify_schema(table)
@pytest.mark.asyncio
async def test_paged_result(manager, random_tables):
"""Test run_async with paged results"""
cql = manager.cql
assert cql is not None
table = await random_tables.add_table(ncolumns=5)
inserts = []
nrows = 21
fetch_size = 10 # Get only 10 rows at a time
for i in range(nrows):
inserts.append(cql.run_async(f"INSERT INTO {table} "
f"({','.join(c.name for c in table.columns)})"
f"VALUES ({', '.join(['%s'] * len(table.columns))})",
parameters=[c.val(i) for c in table.columns]))
await asyncio.gather(*inserts)
# Check only 1 page
stmt = SimpleStatement(f"SELECT * FROM {table} ALLOW FILTERING", fetch_size = fetch_size)
res = await cql.run_async(stmt, all_pages=False)
assert len(res) == fetch_size
# Check all pages
res = await cql.run_async(stmt, all_pages = True)
assert len(res) == nrows