diff --git a/api/CMakeLists.txt b/api/CMakeLists.txt index e1e1a1c9ba..2a6a3fab6f 100644 --- a/api/CMakeLists.txt +++ b/api/CMakeLists.txt @@ -42,6 +42,7 @@ set(swagger_files api-doc/messaging_service.json api-doc/metrics.json api-doc/raft.json + api-doc/service_levels.json api-doc/storage_proxy.json api-doc/storage_service.json api-doc/stream_manager.json @@ -82,6 +83,7 @@ target_sources(api lsa.cc messaging_service.cc raft.cc + service_levels.cc storage_proxy.cc storage_service.cc stream_manager.cc diff --git a/api/api-doc/service_levels.json b/api/api-doc/service_levels.json new file mode 100644 index 0000000000..58c1cf96bd --- /dev/null +++ b/api/api-doc/service_levels.json @@ -0,0 +1,56 @@ +{ + "apiVersion":"0.0.1", + "swaggerVersion":"1.2", + "basePath":"{{Protocol}}://{{Host}}", + "resourcePath":"/service_levels", + "produces":[ + "application/json" + ], + "apis":[ + { + "path":"/service_levels/switch_tenants", + "operations":[ + { + "method":"POST", + "summary":"Switch tenants on all opened connections if needed", + "type":"void", + "nickname":"do_switch_tenants", + "produces":[ + "application/json" + ], + "parameters":[] + } + ] + }, + { + "path":"/service_levels/count_connections", + "operations":[ + { + "method":"GET", + "summary":"Count opened CQL connections per scheduling group per user", + "type":"connections_count_map", + "nickname":"count_connections", + "produces":[ + "application/json" + ], + "parameters":[] + } + ] + } + ], + "models":{}, + "components": { + "schemas": { + "connections_count_map": { + "type": "object", + "additionalProperties": { + "type": "object", + "additionalProperties": { + "type": "integer" + } + } + } + } + } + +} \ No newline at end of file diff --git a/api/api.cc b/api/api.cc index f3ba941f07..ff5c06432d 100644 --- a/api/api.cc +++ b/api/api.cc @@ -36,6 +36,7 @@ #include "tasks.hh" #include "raft.hh" #include "gms/gossip_address_map.hh" +#include "service_levels.hh" logging::logger apilog("api"); @@ -358,6 +359,12 @@ future<> unset_server_cql_server_test(http_context& ctx) { #endif +future<> set_server_service_levels(http_context &ctx, cql_transport::controller& ctl, sharded& qp) { + return register_api(ctx, "service_levels", "The service levels API", [&ctl, &qp] (http_context& ctx, routes& r) { + set_service_levels(ctx, r, ctl, qp); + }); +} + future<> set_server_tasks_compaction_module(http_context& ctx, sharded& ss, sharded& snap_ctl) { auto rb = std::make_shared < api_registry_builder > (ctx.api_doc); diff --git a/api/api_init.hh b/api/api_init.hh index 236600c797..435f637c0c 100644 --- a/api/api_init.hh +++ b/api/api_init.hh @@ -73,6 +73,10 @@ namespace tasks { class task_manager; } +namespace cql3 { +class query_processor; +} + namespace api { struct http_context { @@ -141,6 +145,7 @@ future<> set_format_selector(http_context& ctx, db::sstables_format_selector& se future<> unset_format_selector(http_context& ctx); future<> set_server_cql_server_test(http_context& ctx, cql_transport::controller& ctl); future<> unset_server_cql_server_test(http_context& ctx); +future<> set_server_service_levels(http_context& ctx, cql_transport::controller& ctl, sharded& qp); future<> set_server_commitlog(http_context& ctx, sharded&); future<> unset_server_commitlog(http_context& ctx); diff --git a/api/service_levels.cc b/api/service_levels.cc new file mode 100644 index 0000000000..753a6e3198 --- /dev/null +++ b/api/service_levels.cc @@ -0,0 +1,63 @@ +/* + * Copyright (C) 2023-present ScyllaDB + */ + +/* + * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 + */ + +#include "service_levels.hh" +#include "api/api-doc/service_levels.json.hh" +#include "cql3/query_processor.hh" +#include "cql3/untyped_result_set.hh" +#include "db/consistency_level_type.hh" +#include "seastar/json/json_elements.hh" +#include "transport/controller.hh" +#include + + +namespace api { + +namespace sl = httpd::service_levels_json; +using namespace json; +using namespace seastar::httpd; + + +void set_service_levels(http_context& ctx, routes& r, cql_transport::controller& ctl, sharded& qp) { + sl::do_switch_tenants.set(r, [&ctl] (std::unique_ptr req) -> future { + co_await ctl.update_connections_scheduling_group(); + co_return json_void(); + }); + + sl::count_connections.set(r, [&qp] (std::unique_ptr req) -> future { + auto connections = co_await qp.local().execute_internal( + "SELECT username, scheduling_group FROM system.clients WHERE client_type='cql' ALLOW FILTERING", + db::consistency_level::LOCAL_ONE, + cql3::query_processor::cache_internal::no + ); + + using connections_per_user = std::unordered_map; + using connections_per_scheduling_group = std::unordered_map; + connections_per_scheduling_group result; + + for (auto it = connections->begin(); it != connections->end(); it++) { + auto user = it->get_as("username"); + auto shg = it->get_as("scheduling_group"); + + if (result.contains(shg)) { + result[shg][user]++; + } + else { + result[shg] = {{user, 1}}; + } + } + + co_return result; + }); + +} + + + + +} \ No newline at end of file diff --git a/api/service_levels.hh b/api/service_levels.hh new file mode 100644 index 0000000000..e2e3993774 --- /dev/null +++ b/api/service_levels.hh @@ -0,0 +1,17 @@ +/* + * Copyright (C) 2023-present ScyllaDB + */ + +/* + * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 + */ + +#pragma once + +#include "api.hh" + +namespace api { + +void set_service_levels(http_context& ctx, httpd::routes& r, cql_transport::controller& ctl, sharded& qp); + +} \ No newline at end of file diff --git a/configure.py b/configure.py index 5960768e4f..68c05bad79 100755 --- a/configure.py +++ b/configure.py @@ -1215,6 +1215,8 @@ api = ['api/api.cc', Json2Code('api/api-doc/raft.json'), Json2Code('api/api-doc/cql_server_test.json'), 'api/cql_server_test.cc', + 'api/service_levels.cc', + Json2Code('api/api-doc/service_levels.json'), ] alternator = [ diff --git a/main.cc b/main.cc index bc8980cbe8..cd4da43c52 100644 --- a/main.cc +++ b/main.cc @@ -2286,6 +2286,9 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl // Register controllers after drain_on_shutdown() below, so that even on start // failure drain is called and stops controllers cql_transport::controller cql_server_ctl(auth_service, mm_notifier, gossiper, qp, service_memory_limiter, sl_controller, lifecycle_notifier, *cfg, cql_sg_stats_key, maintenance_socket_enabled::no, dbcfg.statement_scheduling_group); + + api::set_server_service_levels(ctx, cql_server_ctl, qp).get(); + alternator::controller alternator_ctl(gossiper, proxy, mm, sys_dist_ks, cdc_generation_service, service_memory_limiter, auth_service, sl_controller, *cfg, dbcfg.statement_scheduling_group); redis::controller redis_ctl(proxy, auth_service, mm, *cfg, gossiper, dbcfg.statement_scheduling_group); diff --git a/test/cqlpy/test_service_level_api.py b/test/cqlpy/test_service_level_api.py new file mode 100644 index 0000000000..6a6ae25d9c --- /dev/null +++ b/test/cqlpy/test_service_level_api.py @@ -0,0 +1,148 @@ +# -*- coding: utf-8 -*- +# Copyright 2024-present ScyllaDB +# +# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 + +######################################## +# Tests for the service levels HTTP API. +######################################## + +import pytest +from .rest_api import get_request, post_request +from .util import new_session, unique_name +import time + +def count_opened_connections(cql, retry_unauthenticated=True): + response = get_request(cql, "service_levels/count_connections") + return response + +def switch_tenants(cql): + return post_request(cql, "service_levels/switch_tenants") + +def count_opened_connections_from_table(cql): + connections = cql.execute("SELECT username, scheduling_group FROM system.clients WHERE client_type='cql' ALLOW FILTERING") + result = {} + for row in connections: + user = row[0] + shg = row[1] + + if shg in result: + if user in result[shg]: + result[shg][user] += 1 + else: + result[shg][user] = 1 + else: + result[shg] = {user: 1} + + return result + +def wait_until_all_connections_authenticated(cql, wait_s = 1, timeout_s = 30): + start_time = time.time() + while time.time() - start_time < timeout_s: + result = cql.execute("SELECT COUNT(*) FROM system.clients WHERE username='anonymous' ALLOW FILTERING") + if result.one()[0] == 0: + return + else: + time.sleep(wait_s) + + raise RuntimeError(f"Awaiting for connections authentication timed out.") + +def wait_for_scheduling_group_assignment(cql, user, scheduling_group, wait_s = 2, timeout_s = 60): + start_time = time.time() + while time.time() - start_time < timeout_s: + connections = cql.execute(f"SELECT username, scheduling_group FROM system.clients WHERE client_type='cql' AND username='{user}' ALLOW FILTERING") + + require_wait = False + for row in connections: + if row[1] != f"sl:{scheduling_group}": + require_wait = True + break + if require_wait: + time.sleep(wait_s) + continue + return + + raise RuntimeError(f"Awaiting for user '{user}' to switch tenant to scheduling group '{scheduling_group}' timed out.") + +# Test if `/service_levels/count_connections` prints counted CQL connections +# per scheduling group per user. +def test_count_opened_cql_connections(cql): + user = f"test_user_{unique_name()}" + sl = f"sl_{unique_name()}" + + cql.execute(f"CREATE ROLE {user} WITH login = true AND password='{user}'") + cql.execute(f"CREATE SERVICE LEVEL {sl} WITH shares = 100") + cql.execute(f"ATTACH SERVICE LEVEL {sl} TO {user}") + + # Service level controller updates in 10 seconds interval, so wait + # for sl1 to be assgined to test_user + time.sleep(10) + try: + with new_session(cql, user): # new sessions is created only to create user's connection to Scylla + wait_until_all_connections_authenticated(cql) + wait_for_scheduling_group_assignment(cql, user, sl) + + api_response = count_opened_connections(cql) + assert f"sl:{sl}" in api_response + assert user in api_response[f"sl:{sl}"] + + table_response = count_opened_connections_from_table(cql) + assert api_response == table_response + finally: + cql.execute(f"DETACH SERVICE LEVEL FROM {user}") + cql.execute(f"DROP ROLE {user}") + cql.execute(f"DROP SERVICE LEVEL {sl}") + +# Test if `/service_levels/switch_tenants` updates scheduling group +# of CQL connections without restarting them. +# +# This test creates a `test_user` and 2 service levels `sl1` and `sl2`. +# Firstly the user is assigned to `sl1` and his connections is created. +# Then the test changes user's service level to `sl2` and +# `/service_levels/switch_tenants` endpoint is called. +def test_switch_tenants(cql): + user = f"test_user_{unique_name()}" + sl1 = f"sl1_{unique_name()}" + sl2 = f"sl2_{unique_name()}" + + + cql.execute(f"CREATE ROLE {user} WITH login = true AND password='{user}'") + cql.execute(f"CREATE SERVICE LEVEL {sl1} WITH shares = 100") + cql.execute(f"CREATE SERVICE LEVEL {sl2} WITH shares = 200") + cql.execute(f"ATTACH SERVICE LEVEL {sl1} TO {user}") + + # Service level controller updates in 10 seconds interval, so wait + # for sl1 to be assgined to test_user + time.sleep(10) + try: + with new_session(cql, user): # new sessions is created only to create user's connection to Scylla + wait_until_all_connections_authenticated(cql) + wait_for_scheduling_group_assignment(cql, user, sl1) + + user_connections_sl1 = cql.execute(f"SELECT scheduling_group FROM system.clients WHERE username='{user}' ALLOW FILTERING") + for conn in user_connections_sl1: + assert conn[0] == f"sl:{sl1}" + + cql.execute(f"DETACH SERVICE LEVEL FROM {user}") + cql.execute(f"ATTACH SERVICE LEVEL {sl2} TO {user}") + # Again wait for service level controller to notice the change + time.sleep(10) + + switch_tenants(cql) + wait_for_scheduling_group_assignment(cql, user, sl2) + + user_connections_sl2 = cql.execute(f"SELECT scheduling_group FROM system.clients WHERE username='{user}' ALLOW FILTERING") + print(count_opened_connections(cql)) + for conn in user_connections_sl2: + assert conn[0] == f"sl:{sl2}" + finally: + cql.execute(f"DETACH SERVICE LEVEL FROM {user}") + cql.execute(f"DROP ROLE {user}") + cql.execute(f"DROP SERVICE LEVEL {sl1}") + cql.execute(f"DROP SERVICE LEVEL {sl2}") + + + + + +