Remove many unused "import" statements or parts of import statement. All of them were detected by Copilot, but I verified each one manually and prepared this patch. Signed-off-by: Nadav Har'El <nyh@scylladb.com> Closes scylladb/scylladb#27675
228 lines
10 KiB
Python
228 lines
10 KiB
Python
# -*- coding: utf-8 -*-
|
|
# Copyright 2020-present ScyllaDB
|
|
#
|
|
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
|
|
#############################################################################
|
|
# Tests for the service levels infrastructure. Service levels can be attached
|
|
# to roles in order to apply various role-specific parameters, like timeouts.
|
|
#############################################################################
|
|
|
|
from contextlib import contextmanager, ExitStack
|
|
from .util import unique_name, new_user
|
|
from .rest_api import scylla_inject_error
|
|
|
|
from cassandra.protocol import InvalidRequest
|
|
from cassandra.util import Duration
|
|
|
|
import pytest
|
|
|
|
# MAX_USER_SERVICE_LEVELS represents the maximal number of service levels that users can create.
|
|
# The value is documentented in `docs/features/workload-prioritization.rst`.
|
|
# It's used for regression testing of user service level limit in this and other files.
|
|
MAX_USER_SERVICE_LEVELS = 8
|
|
|
|
@contextmanager
|
|
def new_service_level(cql, timeout=None, workload_type=None, shares=None, role=None):
|
|
params = ""
|
|
if timeout or workload_type or shares:
|
|
params = "WITH "
|
|
first = True
|
|
|
|
if timeout:
|
|
if first:
|
|
first = False
|
|
else:
|
|
params += "AND "
|
|
params += f"timeout = {timeout} "
|
|
if workload_type:
|
|
if first:
|
|
first = False
|
|
else:
|
|
params += "AND "
|
|
params += f"workload_type = '{workload_type}' "
|
|
if shares:
|
|
if first:
|
|
first = False
|
|
else:
|
|
params += "AND "
|
|
params += f"shares = {shares} "
|
|
|
|
attach_to = role if role else cql.cluster.auth_provider.username
|
|
|
|
try:
|
|
sl = f"sl_{unique_name()}"
|
|
cql.execute(f"CREATE SERVICE LEVEL {sl} {params}")
|
|
cql.execute(f"ATTACH SERVICE LEVEL {sl} TO {attach_to}")
|
|
yield sl
|
|
finally:
|
|
cql.execute(f"DETACH SERVICE LEVEL FROM {attach_to}")
|
|
cql.execute(f"DROP SERVICE LEVEL IF EXISTS {sl}")
|
|
|
|
# Test that setting service level timeouts correctly sets the timeout parameter
|
|
def test_set_service_level_timeouts(scylla_only, cql):
|
|
with new_service_level(cql) as sl:
|
|
cql.execute(f"ALTER SERVICE LEVEL {sl} WITH timeout = 575ms")
|
|
res = cql.execute(f"LIST SERVICE LEVEL {sl}")
|
|
assert res.one().timeout == Duration(0, 0, 575000000)
|
|
cql.execute(f"ALTER SERVICE LEVEL {sl} WITH timeout = 2h")
|
|
res = cql.execute(f"LIST SERVICE LEVEL {sl}")
|
|
assert res.one().timeout == Duration(0, 0, 2*60*60*10**9)
|
|
cql.execute(f"ALTER SERVICE LEVEL {sl} WITH timeout = null")
|
|
res = cql.execute(f"LIST SERVICE LEVEL {sl}")
|
|
assert not res.one().timeout
|
|
|
|
# Test that incorrect service level timeout values result in an error
|
|
def test_validate_service_level_timeouts(scylla_only, cql):
|
|
with new_service_level(cql) as sl:
|
|
for incorrect in ['1ns', '-5s','writetime', '1second', '10d', '5y', '7', '0']:
|
|
print(f"Checking {incorrect}")
|
|
with pytest.raises(Exception):
|
|
cql.execute(f"ALTER SERVICE LEVEL {sl} WITH timeout = {incorrect}")
|
|
|
|
# Test that the service level is correctly attached to the user's role
|
|
def test_attached_service_level(scylla_only, cql):
|
|
with new_service_level(cql) as sl:
|
|
res_one = cql.execute(f"LIST ATTACHED SERVICE LEVEL OF {cql.cluster.auth_provider.username}").one()
|
|
assert res_one.role == cql.cluster.auth_provider.username and res_one.service_level == sl
|
|
res_one = cql.execute(f"LIST ALL ATTACHED SERVICE LEVELS").one()
|
|
assert res_one.role == cql.cluster.auth_provider.username and res_one.service_level == sl
|
|
|
|
def test_list_effective_service_level(scylla_only, cql):
|
|
timeout = "10s"
|
|
workload_type = "batch"
|
|
|
|
with new_user(cql, "r1") as r1:
|
|
with new_user(cql, "r2") as r2:
|
|
with new_service_level(cql, timeout=timeout, role=r1) as sl1:
|
|
with new_service_level(cql, workload_type=workload_type, role=r2) as sl2:
|
|
cql.execute(f"GRANT {r2} TO {r1}")
|
|
|
|
list_r1 = cql.execute(f"LIST EFFECTIVE SERVICE LEVEL OF {r1}")
|
|
for row in list_r1:
|
|
if row.service_level_option == "timeout":
|
|
assert row.effective_service_level == sl1
|
|
assert row.value == "10s"
|
|
if row.service_level_option == "workload_type":
|
|
assert row.effective_service_level == sl2
|
|
assert row.value == "batch"
|
|
|
|
list_r2 = cql.execute(f"LIST EFFECTIVE SERVICE LEVEL OF {r2}")
|
|
for row in list_r2:
|
|
if row.service_level_option == "timeout":
|
|
assert row.effective_service_level == sl2
|
|
assert row.value == None
|
|
if row.service_level_option == "workload_type":
|
|
assert row.effective_service_level == sl2
|
|
assert row.value == "batch"
|
|
|
|
def test_list_effective_service_level_shares(scylla_only, cql):
|
|
shares1 = 500
|
|
shares2 = 200
|
|
|
|
with new_user(cql, "r1") as r1:
|
|
with new_user(cql, "r2") as r2:
|
|
with new_service_level(cql, shares=shares1, role=r1) as sl1:
|
|
with new_service_level(cql, shares=shares2, role=r2) as sl2:
|
|
cql.execute(f"GRANT {r2} TO {r1}")
|
|
|
|
list_r1 = cql.execute(f"LIST EFFECTIVE SERVICE LEVEL OF {r1}")
|
|
for row in list_r1:
|
|
if row.service_level_option == "shares":
|
|
assert row.effective_service_level == sl2
|
|
assert row.value == f"{shares2}"
|
|
list_r2 = cql.execute(f"LIST EFFECTIVE SERVICE LEVEL OF {r2}")
|
|
for row in list_r2:
|
|
if row.service_level_option == "shares":
|
|
assert row.effective_service_level == sl2
|
|
assert row.value == f"{shares2}"
|
|
|
|
def test_list_effective_service_level_without_attached(scylla_only, cql):
|
|
with new_user(cql) as role:
|
|
with pytest.raises(InvalidRequest, match=f"Role {role} doesn't have assigned any service level"):
|
|
cql.execute(f"LIST EFFECTIVE SERVICE LEVEL OF {role}")
|
|
|
|
# ScyllaDB limits the number of service levels to a small number (10 including 1 default and 1 driver service level).
|
|
# This test verifies that attempting to create more service levels than that results in an InvalidRequest error
|
|
# and doesn't silently succeed.
|
|
# The test also has a regression check if a user can create exactly 8 service levels.
|
|
# In case you are adding a new internal scheduling group and this test failed, you should increase `SCHEDULING_GROUPS_COUNT`
|
|
#
|
|
# Reproduces enterprise issue #4481.
|
|
# Reproduces enterprise issue #5014.
|
|
def test_scheduling_groups_limit(scylla_only, cql):
|
|
sl_count = 100
|
|
created_count = 0
|
|
|
|
with pytest.raises(InvalidRequest, match="Can't create service level - no more scheduling groups exist"):
|
|
with ExitStack() as stack:
|
|
for i in range(sl_count):
|
|
stack.enter_context(new_service_level(cql))
|
|
created_count = created_count + 1
|
|
|
|
assert created_count > 0
|
|
assert created_count == MAX_USER_SERVICE_LEVELS # regression check
|
|
|
|
def test_default_shares_in_listings(scylla_only, cql):
|
|
with scylla_inject_error(cql, "create_service_levels_without_default_shares", one_shot=False), \
|
|
new_user(cql) as role:
|
|
with new_service_level(cql, role=role) as sl:
|
|
list_effective = cql.execute(f"LIST EFFECTIVE SERVICE LEVEL OF {role}")
|
|
shares_info = [row for row in list_effective if row.service_level_option == "shares"][0]
|
|
assert shares_info.value == "1000"
|
|
assert shares_info.effective_service_level == sl
|
|
|
|
list_sl = cql.execute(f"LIST SERVICE LEVEL {sl}").one()
|
|
assert list_sl.shares == 1000
|
|
|
|
# Verify that we cannot manipulate the default service level
|
|
# and that the messages Scylla returns are informative.
|
|
def test_manipulating_default_service_level(cql, scylla_only):
|
|
default_sl = "default"
|
|
|
|
with new_user(cql) as role:
|
|
# Creation.
|
|
create_err = f"The default service level, {default_sl}, already exists and cannot be created"
|
|
with pytest.raises(InvalidRequest, match=create_err):
|
|
cql.execute(f"CREATE SERVICE LEVEL {default_sl} WITH SHARES = 100")
|
|
|
|
# Alteration.
|
|
alter_err = f"The default service level, {default_sl}, cannot be altered"
|
|
with pytest.raises(InvalidRequest, match=alter_err):
|
|
cql.execute(f"ALTER SERVICE LEVEL {default_sl} WITH SHARES = 200")
|
|
|
|
# Attachment.
|
|
attach_err = f"The default service level, {default_sl}, cannot be attached to a role. " \
|
|
"If you want to detach an attached service level, use the DETACH SERVICE " \
|
|
"LEVEL statement"
|
|
with pytest.raises(InvalidRequest, match=attach_err):
|
|
cql.execute(f"ATTACH SERVICE LEVEL {default_sl} TO {role}")
|
|
|
|
# Dropping.
|
|
drop_err = f"The default service level, {default_sl}, cannot be dropped"
|
|
with pytest.raises(InvalidRequest, match=drop_err):
|
|
cql.execute(f"DROP SERVICE LEVEL {default_sl}")
|
|
|
|
# Verify that we can manipulate service levels whose names are similar
|
|
# to the name of the default service level, but they're not the same.
|
|
# Sanity check.
|
|
def test_manipulating_fake_default_service_level(cql, scylla_only):
|
|
# Service levels are case-sensitive (if used with quotation marks).
|
|
fake_default_sl = '"DeFaUlT"'
|
|
|
|
with new_user(cql) as role:
|
|
try:
|
|
# Creation.
|
|
cql.execute(f"CREATE SERVICE LEVEL {fake_default_sl} WITH SHARES = 100")
|
|
|
|
# Alteration.
|
|
cql.execute(f"ALTER SERVICE LEVEL {fake_default_sl} WITH SHARES = 200")
|
|
|
|
# Attachment.
|
|
cql.execute(f"ATTACH SERVICE LEVEL {fake_default_sl} TO {role}")
|
|
|
|
# Dropping.
|
|
cql.execute(f"DROP SERVICE LEVEL {fake_default_sl}")
|
|
finally:
|
|
cql.execute(f"DROP SERVICE LEVEL IF EXISTS {fake_default_sl}")
|