Files
scylladb/test/cqlpy/test_virtual_tables.py
Vlad Zolotarov 85adf6bdb1 system.clients: add a client_options column
This new column is going to contain all OPTIONS sent in the
STARTUP frame of the corresponding CQL session.

The new column has a `frozen<map<text, text>>` type, and
we are also optimizing the amount of required memory for storing
corresponding keys and values by caching them on each shard level.

Signed-off-by: Vlad Zolotarov <vladz@scylladb.com>
2025-12-20 12:26:15 -05:00

181 lines
8.7 KiB
Python

# -*- coding: utf-8 -*-
# Copyright 2021-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
import pytest
from . import nodetool
from . import util
import json
from collections import defaultdict
def verify_snapshots(cql, expected_snapshots: dict[str, set[str]]):
results = list(cql.execute(f"SELECT keyspace_name, table_name, snapshot_name, live, total FROM system.snapshots"))
for res in results:
if res.snapshot_name in expected_snapshots:
t = f"{res.keyspace_name}.{res.table_name}"
assert t in expected_snapshots[res.snapshot_name], f"Unexpected snapshot {t}: snapshot_name={res.snapshot_name}: expected_snapshots={expected_snapshots}"
expected_snapshots[res.snapshot_name].remove(t)
for _, expected_tables in expected_snapshots.items():
assert not expected_tables, f"Not all expected snapshots were listed: expected_snapshots={expected_snapshots}"
def test_snapshots_table(scylla_only, cql, test_keyspace):
test_tag = util.unique_name()
with util.new_test_table(cql, test_keyspace, 'pk int PRIMARY KEY, v int') as table:
cql.execute(f"INSERT INTO {table} (pk, v) VALUES (0, 0)")
nodetool.take_snapshot(cql, table, test_tag, False)
verify_snapshots(cql, {test_tag: [table]})
nodetool.del_snapshot(cql, test_tag)
def test_snapshots_dropped_table(scylla_only, cql, test_keyspace):
test_tag = util.unique_name()
with util.new_test_table(cql, test_keyspace, 'pk int PRIMARY KEY, v int') as table:
cql.execute(f"INSERT INTO {table} (pk, v) VALUES (0, 0)")
nodetool.take_snapshot(cql, table, test_tag, False)
verify_snapshots(cql, {test_tag: [table]})
nodetool.del_snapshot(cql, test_tag)
def test_snapshots_multiple_keyspaces(scylla_only, cql):
expected_snapshots = defaultdict(set)
ks_opts = "WITH REPLICATION = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1}"
test_tags = [util.unique_name(), util.unique_name(), util.unique_name()]
with util.new_test_keyspace(cql, ks_opts) as test_keyspace1:
with util.new_test_table(cql, test_keyspace1, 'pk int PRIMARY KEY, v int') as table1:
cql.execute(f"INSERT INTO {table1} (pk, v) VALUES (0, 0)")
nodetool.take_snapshot(cql, table1, test_tags[0], False)
expected_snapshots[test_tags[0]].add(table1)
cql.execute(f"INSERT INTO {table1} (pk, v) VALUES (1, 1)")
nodetool.take_snapshot(cql, table1, test_tags[1], False)
expected_snapshots[test_tags[1]].add(table1)
with util.new_test_keyspace(cql, ks_opts) as test_keyspace2:
with util.new_test_table(cql, test_keyspace2, 'pk int PRIMARY KEY, v int') as table2:
cql.execute(f"INSERT INTO {table2} (pk, v) VALUES (0, 0)")
nodetool.take_snapshot(cql, table2, test_tags[0], False)
expected_snapshots[test_tags[0]].add(table2)
cql.execute(f"INSERT INTO {table2} (pk, v) VALUES (2, 2)")
nodetool.take_snapshot(cql, table2, test_tags[2], False)
expected_snapshots[test_tags[2]].add(table2)
verify_snapshots(cql, expected_snapshots)
for t in test_tags:
nodetool.del_snapshot(cql, t)
def test_clients(scylla_only, cql):
columns = ', '.join([
'address',
'port',
'client_type',
'connection_stage',
'driver_name',
'driver_version',
'hostname',
'protocol_version',
'shard_id',
'ssl_cipher_suite',
'ssl_enabled',
'ssl_protocol',
'username',
'client_options',
])
cls = list(cql.execute(f"SELECT {columns} FROM system.clients"))
# There must be at least one connection - the one that sent this SELECT
# request.
assert len(cls) > 0
for cl in cls:
assert(cl[0] == '127.0.0.1')
assert(cl[2] == 'cql')
client_options = cl[13]
assert(client_options.get('DRIVER_NAME') == cl[4])
assert(client_options.get('DRIVER_VERSION') == cl[5])
# We only want to check that the table exists with the listed columns, to assert
# backwards compatibility.
def _check_exists(cql, table_name, columns):
cols = ", ".join(columns)
assert list(cql.execute(f"SELECT {cols} FROM system.{table_name}"))
def test_protocol_servers(scylla_only, cql):
_check_exists(cql, "protocol_servers", ("name", "listen_addresses", "protocol", "protocol_version"))
def test_runtime_info(scylla_only, cql):
_check_exists(cql, "runtime_info", ("group", "item", "value"))
def test_versions(scylla_only, cql):
_check_exists(cql, "versions", ("key", "build_id", "build_mode", "version"))
# Check reading the system.config table, which should list all configuration
# parameters. As we noticed in issue #10047, each type of configuration
# parameter can have a different function for printing it out, and some of
# those may be wrong so we want to check as many as we can - including
# specifically the experimental_features option which was wrong in #10047
# and #11003.
def test_system_config_read(scylla_only, cql):
# All rows should have the columns name, source, type and value:
rows = list(cql.execute("SELECT name, source, type, value FROM system.config"))
values = dict()
for row in rows:
values[row.name] = row.value
# Check that experimental_features exists and makes sense.
# It needs to be a JSON-formatted strings, and the strings need to be
# ASCII feature names - not binary garbage as it was in #10047,
# and not numbers-formatted-as-string as in #11003.
assert 'experimental_features' in values
obj = json.loads(values['experimental_features'])
assert isinstance(obj, list)
assert isinstance(obj[0], str)
assert obj[0] and obj[0].isascii() and obj[0].isprintable()
assert not obj[0].isnumeric() # issue #11003
# Check formatting of tri_mode_restriction like
# restrict_dtcs. These need to be one of
# allowed string values 0, 1, true, false or warn - but in particular
# non-empty and printable ASCII, not garbage.
assert 'restrict_dtcs' in values
obj = json.loads(values['restrict_dtcs'])
assert isinstance(obj, str)
assert obj and obj.isascii() and obj.isprintable()
# Verify that boolean configuration items round-trip and use the yaml/json
# representation (true/false). #19791.
def test_system_config_update_boolean(scylla_only, cql):
var = 'compaction_enforce_min_threshold'
value = cql.execute(f"SELECT value FROM system.config WHERE name = '{var}'").one().value
assert value in ('true', 'false')
other = 'true' if value == 'false' else 'false'
cql.execute(f"UPDATE system.config SET value = '{other}' WHERE name = '{var}'")
readback = cql.execute(f"SELECT value FROM system.config WHERE name = '{var}'").one().value
assert readback == other
# just for completeness, check that writing 0/1 works too
cql.execute(f"UPDATE system.config SET value = '0' WHERE name = '{var}'")
readback = cql.execute(f"SELECT value FROM system.config WHERE name = '{var}'").one().value
assert readback == 'false'
cql.execute(f"UPDATE system.config SET value = '1' WHERE name = '{var}'")
readback = cql.execute(f"SELECT value FROM system.config WHERE name = '{var}'").one().value
assert readback == 'true'
# restore original
cql.execute(f"UPDATE system.config SET value = '{value}' WHERE name = '{var}'")
readback = cql.execute(f"SELECT value FROM system.config WHERE name = '{var}'").one().value
assert readback == value
def test_token_ring_vnodes(scylla_only, cql, test_keyspace_vnodes):
rows = list(cql.execute(f"SELECT * FROM system.token_ring WHERE keyspace_name = '{test_keyspace_vnodes}'"))
num_tokens = int(list(cql.execute("SELECT value FROM system.config WHERE name = 'num_tokens'"))[0].value)
assert len(rows) == num_tokens
for row in rows:
assert row.keyspace_name == test_keyspace_vnodes
assert row.table_name == "<ALL>"
def test_token_ring_tablets(scylla_only, cql, test_keyspace_tablets):
if test_keyspace_tablets is None:
pytest.skip("skipping tablets specific tests -- tablets not enabled")
with util.new_test_table(cql, test_keyspace_tablets, 'pk int PRIMARY KEY') as table:
rows = list(cql.execute(f"SELECT * FROM system.token_ring WHERE keyspace_name = '{test_keyspace_tablets}' AND table_name = '{table}'"))
tablets = list(cql.execute(f"SELECT * from system.tablets WHERE keyspace_name = '{test_keyspace_tablets}' AND table_name = '{table}' ALLOW FILTERING"))
assert len(rows) == len(tablets)
for row in rows:
assert row.keyspace_name == test_keyspace_tablets
assert row.table_name == table