api: introduce service levels specific API

Introduces two endpoints with operations specific to service levels:

- switch_tenants: updates the scheduling group of all connections to be
  aligned with the service level specific to the logged in user. This is
  mostly legacy API, as with service levels on raft this is done
  automatically.
- count_connections: for each user and for each scheduling group, counts
  how many connections are assigned to that user and scheduling group.
  This API is used in tests.
This commit is contained in:
Piotr Dulikowski
2024-12-13 09:31:35 +01:00
parent a65c0c3735
commit 49f5fc0e70
9 changed files with 303 additions and 0 deletions

View File

@@ -42,6 +42,7 @@ set(swagger_files
api-doc/messaging_service.json
api-doc/metrics.json
api-doc/raft.json
api-doc/service_levels.json
api-doc/storage_proxy.json
api-doc/storage_service.json
api-doc/stream_manager.json
@@ -82,6 +83,7 @@ target_sources(api
lsa.cc
messaging_service.cc
raft.cc
service_levels.cc
storage_proxy.cc
storage_service.cc
stream_manager.cc

View File

@@ -0,0 +1,56 @@
{
"apiVersion":"0.0.1",
"swaggerVersion":"1.2",
"basePath":"{{Protocol}}://{{Host}}",
"resourcePath":"/service_levels",
"produces":[
"application/json"
],
"apis":[
{
"path":"/service_levels/switch_tenants",
"operations":[
{
"method":"POST",
"summary":"Switch tenants on all opened connections if needed",
"type":"void",
"nickname":"do_switch_tenants",
"produces":[
"application/json"
],
"parameters":[]
}
]
},
{
"path":"/service_levels/count_connections",
"operations":[
{
"method":"GET",
"summary":"Count opened CQL connections per scheduling group per user",
"type":"connections_count_map",
"nickname":"count_connections",
"produces":[
"application/json"
],
"parameters":[]
}
]
}
],
"models":{},
"components": {
"schemas": {
"connections_count_map": {
"type": "object",
"additionalProperties": {
"type": "object",
"additionalProperties": {
"type": "integer"
}
}
}
}
}
}

View File

@@ -36,6 +36,7 @@
#include "tasks.hh"
#include "raft.hh"
#include "gms/gossip_address_map.hh"
#include "service_levels.hh"
logging::logger apilog("api");
@@ -358,6 +359,12 @@ future<> unset_server_cql_server_test(http_context& ctx) {
#endif
future<> set_server_service_levels(http_context &ctx, cql_transport::controller& ctl, sharded<cql3::query_processor>& qp) {
return register_api(ctx, "service_levels", "The service levels API", [&ctl, &qp] (http_context& ctx, routes& r) {
set_service_levels(ctx, r, ctl, qp);
});
}
future<> set_server_tasks_compaction_module(http_context& ctx, sharded<service::storage_service>& ss, sharded<db::snapshot_ctl>& snap_ctl) {
auto rb = std::make_shared < api_registry_builder > (ctx.api_doc);

View File

@@ -73,6 +73,10 @@ namespace tasks {
class task_manager;
}
namespace cql3 {
class query_processor;
}
namespace api {
struct http_context {
@@ -141,6 +145,7 @@ future<> set_format_selector(http_context& ctx, db::sstables_format_selector& se
future<> unset_format_selector(http_context& ctx);
future<> set_server_cql_server_test(http_context& ctx, cql_transport::controller& ctl);
future<> unset_server_cql_server_test(http_context& ctx);
future<> set_server_service_levels(http_context& ctx, cql_transport::controller& ctl, sharded<cql3::query_processor>& qp);
future<> set_server_commitlog(http_context& ctx, sharded<replica::database>&);
future<> unset_server_commitlog(http_context& ctx);

63
api/service_levels.cc Normal file
View File

@@ -0,0 +1,63 @@
/*
* Copyright (C) 2023-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include "service_levels.hh"
#include "api/api-doc/service_levels.json.hh"
#include "cql3/query_processor.hh"
#include "cql3/untyped_result_set.hh"
#include "db/consistency_level_type.hh"
#include "seastar/json/json_elements.hh"
#include "transport/controller.hh"
#include <unordered_map>
namespace api {
namespace sl = httpd::service_levels_json;
using namespace json;
using namespace seastar::httpd;
void set_service_levels(http_context& ctx, routes& r, cql_transport::controller& ctl, sharded<cql3::query_processor>& qp) {
sl::do_switch_tenants.set(r, [&ctl] (std::unique_ptr<http::request> req) -> future<json::json_return_type> {
co_await ctl.update_connections_scheduling_group();
co_return json_void();
});
sl::count_connections.set(r, [&qp] (std::unique_ptr<http::request> req) -> future<json::json_return_type> {
auto connections = co_await qp.local().execute_internal(
"SELECT username, scheduling_group FROM system.clients WHERE client_type='cql' ALLOW FILTERING",
db::consistency_level::LOCAL_ONE,
cql3::query_processor::cache_internal::no
);
using connections_per_user = std::unordered_map<sstring, uint64_t>;
using connections_per_scheduling_group = std::unordered_map<sstring, connections_per_user>;
connections_per_scheduling_group result;
for (auto it = connections->begin(); it != connections->end(); it++) {
auto user = it->get_as<sstring>("username");
auto shg = it->get_as<sstring>("scheduling_group");
if (result.contains(shg)) {
result[shg][user]++;
}
else {
result[shg] = {{user, 1}};
}
}
co_return result;
});
}
}

17
api/service_levels.hh Normal file
View File

@@ -0,0 +1,17 @@
/*
* Copyright (C) 2023-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#pragma once
#include "api.hh"
namespace api {
void set_service_levels(http_context& ctx, httpd::routes& r, cql_transport::controller& ctl, sharded<cql3::query_processor>& qp);
}

View File

@@ -1215,6 +1215,8 @@ api = ['api/api.cc',
Json2Code('api/api-doc/raft.json'),
Json2Code('api/api-doc/cql_server_test.json'),
'api/cql_server_test.cc',
'api/service_levels.cc',
Json2Code('api/api-doc/service_levels.json'),
]
alternator = [

View File

@@ -2286,6 +2286,9 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
// Register controllers after drain_on_shutdown() below, so that even on start
// failure drain is called and stops controllers
cql_transport::controller cql_server_ctl(auth_service, mm_notifier, gossiper, qp, service_memory_limiter, sl_controller, lifecycle_notifier, *cfg, cql_sg_stats_key, maintenance_socket_enabled::no, dbcfg.statement_scheduling_group);
api::set_server_service_levels(ctx, cql_server_ctl, qp).get();
alternator::controller alternator_ctl(gossiper, proxy, mm, sys_dist_ks, cdc_generation_service, service_memory_limiter, auth_service, sl_controller, *cfg, dbcfg.statement_scheduling_group);
redis::controller redis_ctl(proxy, auth_service, mm, *cfg, gossiper, dbcfg.statement_scheduling_group);

View File

@@ -0,0 +1,148 @@
# -*- coding: utf-8 -*-
# Copyright 2024-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
########################################
# Tests for the service levels HTTP API.
########################################
import pytest
from .rest_api import get_request, post_request
from .util import new_session, unique_name
import time
def count_opened_connections(cql, retry_unauthenticated=True):
response = get_request(cql, "service_levels/count_connections")
return response
def switch_tenants(cql):
return post_request(cql, "service_levels/switch_tenants")
def count_opened_connections_from_table(cql):
connections = cql.execute("SELECT username, scheduling_group FROM system.clients WHERE client_type='cql' ALLOW FILTERING")
result = {}
for row in connections:
user = row[0]
shg = row[1]
if shg in result:
if user in result[shg]:
result[shg][user] += 1
else:
result[shg][user] = 1
else:
result[shg] = {user: 1}
return result
def wait_until_all_connections_authenticated(cql, wait_s = 1, timeout_s = 30):
start_time = time.time()
while time.time() - start_time < timeout_s:
result = cql.execute("SELECT COUNT(*) FROM system.clients WHERE username='anonymous' ALLOW FILTERING")
if result.one()[0] == 0:
return
else:
time.sleep(wait_s)
raise RuntimeError(f"Awaiting for connections authentication timed out.")
def wait_for_scheduling_group_assignment(cql, user, scheduling_group, wait_s = 2, timeout_s = 60):
start_time = time.time()
while time.time() - start_time < timeout_s:
connections = cql.execute(f"SELECT username, scheduling_group FROM system.clients WHERE client_type='cql' AND username='{user}' ALLOW FILTERING")
require_wait = False
for row in connections:
if row[1] != f"sl:{scheduling_group}":
require_wait = True
break
if require_wait:
time.sleep(wait_s)
continue
return
raise RuntimeError(f"Awaiting for user '{user}' to switch tenant to scheduling group '{scheduling_group}' timed out.")
# Test if `/service_levels/count_connections` prints counted CQL connections
# per scheduling group per user.
def test_count_opened_cql_connections(cql):
user = f"test_user_{unique_name()}"
sl = f"sl_{unique_name()}"
cql.execute(f"CREATE ROLE {user} WITH login = true AND password='{user}'")
cql.execute(f"CREATE SERVICE LEVEL {sl} WITH shares = 100")
cql.execute(f"ATTACH SERVICE LEVEL {sl} TO {user}")
# Service level controller updates in 10 seconds interval, so wait
# for sl1 to be assgined to test_user
time.sleep(10)
try:
with new_session(cql, user): # new sessions is created only to create user's connection to Scylla
wait_until_all_connections_authenticated(cql)
wait_for_scheduling_group_assignment(cql, user, sl)
api_response = count_opened_connections(cql)
assert f"sl:{sl}" in api_response
assert user in api_response[f"sl:{sl}"]
table_response = count_opened_connections_from_table(cql)
assert api_response == table_response
finally:
cql.execute(f"DETACH SERVICE LEVEL FROM {user}")
cql.execute(f"DROP ROLE {user}")
cql.execute(f"DROP SERVICE LEVEL {sl}")
# Test if `/service_levels/switch_tenants` updates scheduling group
# of CQL connections without restarting them.
#
# This test creates a `test_user` and 2 service levels `sl1` and `sl2`.
# Firstly the user is assigned to `sl1` and his connections is created.
# Then the test changes user's service level to `sl2` and
# `/service_levels/switch_tenants` endpoint is called.
def test_switch_tenants(cql):
user = f"test_user_{unique_name()}"
sl1 = f"sl1_{unique_name()}"
sl2 = f"sl2_{unique_name()}"
cql.execute(f"CREATE ROLE {user} WITH login = true AND password='{user}'")
cql.execute(f"CREATE SERVICE LEVEL {sl1} WITH shares = 100")
cql.execute(f"CREATE SERVICE LEVEL {sl2} WITH shares = 200")
cql.execute(f"ATTACH SERVICE LEVEL {sl1} TO {user}")
# Service level controller updates in 10 seconds interval, so wait
# for sl1 to be assgined to test_user
time.sleep(10)
try:
with new_session(cql, user): # new sessions is created only to create user's connection to Scylla
wait_until_all_connections_authenticated(cql)
wait_for_scheduling_group_assignment(cql, user, sl1)
user_connections_sl1 = cql.execute(f"SELECT scheduling_group FROM system.clients WHERE username='{user}' ALLOW FILTERING")
for conn in user_connections_sl1:
assert conn[0] == f"sl:{sl1}"
cql.execute(f"DETACH SERVICE LEVEL FROM {user}")
cql.execute(f"ATTACH SERVICE LEVEL {sl2} TO {user}")
# Again wait for service level controller to notice the change
time.sleep(10)
switch_tenants(cql)
wait_for_scheduling_group_assignment(cql, user, sl2)
user_connections_sl2 = cql.execute(f"SELECT scheduling_group FROM system.clients WHERE username='{user}' ALLOW FILTERING")
print(count_opened_connections(cql))
for conn in user_connections_sl2:
assert conn[0] == f"sl:{sl2}"
finally:
cql.execute(f"DETACH SERVICE LEVEL FROM {user}")
cql.execute(f"DROP ROLE {user}")
cql.execute(f"DROP SERVICE LEVEL {sl1}")
cql.execute(f"DROP SERVICE LEVEL {sl2}")