From 49f5fc0e70310118a8687ead409c317d2e4ce73f Mon Sep 17 00:00:00 2001 From: Piotr Dulikowski Date: Fri, 13 Dec 2024 09:31:35 +0100 Subject: [PATCH] api: introduce service levels specific API Introduces two endpoints with operations specific to service levels: - switch_tenants: updates the scheduling group of all connections to be aligned with the service level specific to the logged in user. This is mostly legacy API, as with service levels on raft this is done automatically. - count_connections: for each user and for each scheduling group, counts how many connections are assigned to that user and scheduling group. This API is used in tests. --- api/CMakeLists.txt | 2 + api/api-doc/service_levels.json | 56 ++++++++++ api/api.cc | 7 ++ api/api_init.hh | 5 + api/service_levels.cc | 63 ++++++++++++ api/service_levels.hh | 17 +++ configure.py | 2 + main.cc | 3 + test/cqlpy/test_service_level_api.py | 148 +++++++++++++++++++++++++++ 9 files changed, 303 insertions(+) create mode 100644 api/api-doc/service_levels.json create mode 100644 api/service_levels.cc create mode 100644 api/service_levels.hh create mode 100644 test/cqlpy/test_service_level_api.py diff --git a/api/CMakeLists.txt b/api/CMakeLists.txt index e1e1a1c9ba..2a6a3fab6f 100644 --- a/api/CMakeLists.txt +++ b/api/CMakeLists.txt @@ -42,6 +42,7 @@ set(swagger_files api-doc/messaging_service.json api-doc/metrics.json api-doc/raft.json + api-doc/service_levels.json api-doc/storage_proxy.json api-doc/storage_service.json api-doc/stream_manager.json @@ -82,6 +83,7 @@ target_sources(api lsa.cc messaging_service.cc raft.cc + service_levels.cc storage_proxy.cc storage_service.cc stream_manager.cc diff --git a/api/api-doc/service_levels.json b/api/api-doc/service_levels.json new file mode 100644 index 0000000000..58c1cf96bd --- /dev/null +++ b/api/api-doc/service_levels.json @@ -0,0 +1,56 @@ +{ + "apiVersion":"0.0.1", + "swaggerVersion":"1.2", + "basePath":"{{Protocol}}://{{Host}}", + "resourcePath":"/service_levels", + "produces":[ + "application/json" + ], + "apis":[ + { + "path":"/service_levels/switch_tenants", + "operations":[ + { + "method":"POST", + "summary":"Switch tenants on all opened connections if needed", + "type":"void", + "nickname":"do_switch_tenants", + "produces":[ + "application/json" + ], + "parameters":[] + } + ] + }, + { + "path":"/service_levels/count_connections", + "operations":[ + { + "method":"GET", + "summary":"Count opened CQL connections per scheduling group per user", + "type":"connections_count_map", + "nickname":"count_connections", + "produces":[ + "application/json" + ], + "parameters":[] + } + ] + } + ], + "models":{}, + "components": { + "schemas": { + "connections_count_map": { + "type": "object", + "additionalProperties": { + "type": "object", + "additionalProperties": { + "type": "integer" + } + } + } + } + } + +} \ No newline at end of file diff --git a/api/api.cc b/api/api.cc index f3ba941f07..ff5c06432d 100644 --- a/api/api.cc +++ b/api/api.cc @@ -36,6 +36,7 @@ #include "tasks.hh" #include "raft.hh" #include "gms/gossip_address_map.hh" +#include "service_levels.hh" logging::logger apilog("api"); @@ -358,6 +359,12 @@ future<> unset_server_cql_server_test(http_context& ctx) { #endif +future<> set_server_service_levels(http_context &ctx, cql_transport::controller& ctl, sharded& qp) { + return register_api(ctx, "service_levels", "The service levels API", [&ctl, &qp] (http_context& ctx, routes& r) { + set_service_levels(ctx, r, ctl, qp); + }); +} + future<> set_server_tasks_compaction_module(http_context& ctx, sharded& ss, sharded& snap_ctl) { auto rb = std::make_shared < api_registry_builder > (ctx.api_doc); diff --git a/api/api_init.hh b/api/api_init.hh index 236600c797..435f637c0c 100644 --- a/api/api_init.hh +++ b/api/api_init.hh @@ -73,6 +73,10 @@ namespace tasks { class task_manager; } +namespace cql3 { +class query_processor; +} + namespace api { struct http_context { @@ -141,6 +145,7 @@ future<> set_format_selector(http_context& ctx, db::sstables_format_selector& se future<> unset_format_selector(http_context& ctx); future<> set_server_cql_server_test(http_context& ctx, cql_transport::controller& ctl); future<> unset_server_cql_server_test(http_context& ctx); +future<> set_server_service_levels(http_context& ctx, cql_transport::controller& ctl, sharded& qp); future<> set_server_commitlog(http_context& ctx, sharded&); future<> unset_server_commitlog(http_context& ctx); diff --git a/api/service_levels.cc b/api/service_levels.cc new file mode 100644 index 0000000000..753a6e3198 --- /dev/null +++ b/api/service_levels.cc @@ -0,0 +1,63 @@ +/* + * Copyright (C) 2023-present ScyllaDB + */ + +/* + * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 + */ + +#include "service_levels.hh" +#include "api/api-doc/service_levels.json.hh" +#include "cql3/query_processor.hh" +#include "cql3/untyped_result_set.hh" +#include "db/consistency_level_type.hh" +#include "seastar/json/json_elements.hh" +#include "transport/controller.hh" +#include + + +namespace api { + +namespace sl = httpd::service_levels_json; +using namespace json; +using namespace seastar::httpd; + + +void set_service_levels(http_context& ctx, routes& r, cql_transport::controller& ctl, sharded& qp) { + sl::do_switch_tenants.set(r, [&ctl] (std::unique_ptr req) -> future { + co_await ctl.update_connections_scheduling_group(); + co_return json_void(); + }); + + sl::count_connections.set(r, [&qp] (std::unique_ptr req) -> future { + auto connections = co_await qp.local().execute_internal( + "SELECT username, scheduling_group FROM system.clients WHERE client_type='cql' ALLOW FILTERING", + db::consistency_level::LOCAL_ONE, + cql3::query_processor::cache_internal::no + ); + + using connections_per_user = std::unordered_map; + using connections_per_scheduling_group = std::unordered_map; + connections_per_scheduling_group result; + + for (auto it = connections->begin(); it != connections->end(); it++) { + auto user = it->get_as("username"); + auto shg = it->get_as("scheduling_group"); + + if (result.contains(shg)) { + result[shg][user]++; + } + else { + result[shg] = {{user, 1}}; + } + } + + co_return result; + }); + +} + + + + +} \ No newline at end of file diff --git a/api/service_levels.hh b/api/service_levels.hh new file mode 100644 index 0000000000..e2e3993774 --- /dev/null +++ b/api/service_levels.hh @@ -0,0 +1,17 @@ +/* + * Copyright (C) 2023-present ScyllaDB + */ + +/* + * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 + */ + +#pragma once + +#include "api.hh" + +namespace api { + +void set_service_levels(http_context& ctx, httpd::routes& r, cql_transport::controller& ctl, sharded& qp); + +} \ No newline at end of file diff --git a/configure.py b/configure.py index 5960768e4f..68c05bad79 100755 --- a/configure.py +++ b/configure.py @@ -1215,6 +1215,8 @@ api = ['api/api.cc', Json2Code('api/api-doc/raft.json'), Json2Code('api/api-doc/cql_server_test.json'), 'api/cql_server_test.cc', + 'api/service_levels.cc', + Json2Code('api/api-doc/service_levels.json'), ] alternator = [ diff --git a/main.cc b/main.cc index bc8980cbe8..cd4da43c52 100644 --- a/main.cc +++ b/main.cc @@ -2286,6 +2286,9 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl // Register controllers after drain_on_shutdown() below, so that even on start // failure drain is called and stops controllers cql_transport::controller cql_server_ctl(auth_service, mm_notifier, gossiper, qp, service_memory_limiter, sl_controller, lifecycle_notifier, *cfg, cql_sg_stats_key, maintenance_socket_enabled::no, dbcfg.statement_scheduling_group); + + api::set_server_service_levels(ctx, cql_server_ctl, qp).get(); + alternator::controller alternator_ctl(gossiper, proxy, mm, sys_dist_ks, cdc_generation_service, service_memory_limiter, auth_service, sl_controller, *cfg, dbcfg.statement_scheduling_group); redis::controller redis_ctl(proxy, auth_service, mm, *cfg, gossiper, dbcfg.statement_scheduling_group); diff --git a/test/cqlpy/test_service_level_api.py b/test/cqlpy/test_service_level_api.py new file mode 100644 index 0000000000..6a6ae25d9c --- /dev/null +++ b/test/cqlpy/test_service_level_api.py @@ -0,0 +1,148 @@ +# -*- coding: utf-8 -*- +# Copyright 2024-present ScyllaDB +# +# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 + +######################################## +# Tests for the service levels HTTP API. +######################################## + +import pytest +from .rest_api import get_request, post_request +from .util import new_session, unique_name +import time + +def count_opened_connections(cql, retry_unauthenticated=True): + response = get_request(cql, "service_levels/count_connections") + return response + +def switch_tenants(cql): + return post_request(cql, "service_levels/switch_tenants") + +def count_opened_connections_from_table(cql): + connections = cql.execute("SELECT username, scheduling_group FROM system.clients WHERE client_type='cql' ALLOW FILTERING") + result = {} + for row in connections: + user = row[0] + shg = row[1] + + if shg in result: + if user in result[shg]: + result[shg][user] += 1 + else: + result[shg][user] = 1 + else: + result[shg] = {user: 1} + + return result + +def wait_until_all_connections_authenticated(cql, wait_s = 1, timeout_s = 30): + start_time = time.time() + while time.time() - start_time < timeout_s: + result = cql.execute("SELECT COUNT(*) FROM system.clients WHERE username='anonymous' ALLOW FILTERING") + if result.one()[0] == 0: + return + else: + time.sleep(wait_s) + + raise RuntimeError(f"Awaiting for connections authentication timed out.") + +def wait_for_scheduling_group_assignment(cql, user, scheduling_group, wait_s = 2, timeout_s = 60): + start_time = time.time() + while time.time() - start_time < timeout_s: + connections = cql.execute(f"SELECT username, scheduling_group FROM system.clients WHERE client_type='cql' AND username='{user}' ALLOW FILTERING") + + require_wait = False + for row in connections: + if row[1] != f"sl:{scheduling_group}": + require_wait = True + break + if require_wait: + time.sleep(wait_s) + continue + return + + raise RuntimeError(f"Awaiting for user '{user}' to switch tenant to scheduling group '{scheduling_group}' timed out.") + +# Test if `/service_levels/count_connections` prints counted CQL connections +# per scheduling group per user. +def test_count_opened_cql_connections(cql): + user = f"test_user_{unique_name()}" + sl = f"sl_{unique_name()}" + + cql.execute(f"CREATE ROLE {user} WITH login = true AND password='{user}'") + cql.execute(f"CREATE SERVICE LEVEL {sl} WITH shares = 100") + cql.execute(f"ATTACH SERVICE LEVEL {sl} TO {user}") + + # Service level controller updates in 10 seconds interval, so wait + # for sl1 to be assgined to test_user + time.sleep(10) + try: + with new_session(cql, user): # new sessions is created only to create user's connection to Scylla + wait_until_all_connections_authenticated(cql) + wait_for_scheduling_group_assignment(cql, user, sl) + + api_response = count_opened_connections(cql) + assert f"sl:{sl}" in api_response + assert user in api_response[f"sl:{sl}"] + + table_response = count_opened_connections_from_table(cql) + assert api_response == table_response + finally: + cql.execute(f"DETACH SERVICE LEVEL FROM {user}") + cql.execute(f"DROP ROLE {user}") + cql.execute(f"DROP SERVICE LEVEL {sl}") + +# Test if `/service_levels/switch_tenants` updates scheduling group +# of CQL connections without restarting them. +# +# This test creates a `test_user` and 2 service levels `sl1` and `sl2`. +# Firstly the user is assigned to `sl1` and his connections is created. +# Then the test changes user's service level to `sl2` and +# `/service_levels/switch_tenants` endpoint is called. +def test_switch_tenants(cql): + user = f"test_user_{unique_name()}" + sl1 = f"sl1_{unique_name()}" + sl2 = f"sl2_{unique_name()}" + + + cql.execute(f"CREATE ROLE {user} WITH login = true AND password='{user}'") + cql.execute(f"CREATE SERVICE LEVEL {sl1} WITH shares = 100") + cql.execute(f"CREATE SERVICE LEVEL {sl2} WITH shares = 200") + cql.execute(f"ATTACH SERVICE LEVEL {sl1} TO {user}") + + # Service level controller updates in 10 seconds interval, so wait + # for sl1 to be assgined to test_user + time.sleep(10) + try: + with new_session(cql, user): # new sessions is created only to create user's connection to Scylla + wait_until_all_connections_authenticated(cql) + wait_for_scheduling_group_assignment(cql, user, sl1) + + user_connections_sl1 = cql.execute(f"SELECT scheduling_group FROM system.clients WHERE username='{user}' ALLOW FILTERING") + for conn in user_connections_sl1: + assert conn[0] == f"sl:{sl1}" + + cql.execute(f"DETACH SERVICE LEVEL FROM {user}") + cql.execute(f"ATTACH SERVICE LEVEL {sl2} TO {user}") + # Again wait for service level controller to notice the change + time.sleep(10) + + switch_tenants(cql) + wait_for_scheduling_group_assignment(cql, user, sl2) + + user_connections_sl2 = cql.execute(f"SELECT scheduling_group FROM system.clients WHERE username='{user}' ALLOW FILTERING") + print(count_opened_connections(cql)) + for conn in user_connections_sl2: + assert conn[0] == f"sl:{sl2}" + finally: + cql.execute(f"DETACH SERVICE LEVEL FROM {user}") + cql.execute(f"DROP ROLE {user}") + cql.execute(f"DROP SERVICE LEVEL {sl1}") + cql.execute(f"DROP SERVICE LEVEL {sl2}") + + + + + +