mirror of
https://github.com/scylladb/scylladb.git
synced 2026-05-29 11:10:40 +00:00
service/qos: adding service level controller
adding the service level controller implementation. The implementation follows the design in: https://docs.google.com/document/d/1RrSTZ3ZX86-YDt2POwAVwFeKN9uX8frEvATJda5n1FU/edit?usp=sharing Some interfaces were added for registration with system componnents. The method of registration is chosen over a constructor parameter, due to the componnets being initialized prior to the service level controller being created. Message-Id: <e9c4e7d5b411062b6a553f5c6861e7875cd71d2c.1609171761.git.sarna@scylladb.com>
This commit is contained in:
committed by
Piotr Sarna
parent
3ecdab30a1
commit
a54ea4667b
@@ -837,6 +837,7 @@ scylla_core = (['database.cc',
|
||||
'service/pager/paging_state.cc',
|
||||
'service/pager/query_pagers.cc',
|
||||
'service/qos/qos_common.cc',
|
||||
'service/qos/service_level_controller.cc',
|
||||
'streaming/stream_task.cc',
|
||||
'streaming/stream_session.cc',
|
||||
'streaming/stream_request.cc',
|
||||
|
||||
360
service/qos/service_level_controller.cc
Normal file
360
service/qos/service_level_controller.cc
Normal file
@@ -0,0 +1,360 @@
|
||||
/*
|
||||
* Copyright (C) 2020 ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* This file is part of Scylla.
|
||||
*
|
||||
* Scylla is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Scylla is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include <algorithm>
|
||||
#include "service_level_controller.hh"
|
||||
#include "service/storage_service.hh"
|
||||
#include "service/priority_manager.hh"
|
||||
#include "message/messaging_service.hh"
|
||||
#include "db/system_distributed_keyspace.hh"
|
||||
|
||||
namespace qos {
|
||||
|
||||
sstring service_level_controller::default_service_level_name = "default";
|
||||
|
||||
|
||||
|
||||
service_level_controller::service_level_controller(sharded<auth::service>& auth_service, service_level_options default_service_level_config):
|
||||
_sl_data_accessor(nullptr),
|
||||
_auth_service(auth_service)
|
||||
{
|
||||
if (this_shard_id() == global_controller) {
|
||||
_global_controller_db = std::make_unique<global_controller_data>();
|
||||
_global_controller_db->default_service_level_config = default_service_level_config;
|
||||
}
|
||||
}
|
||||
|
||||
future<> service_level_controller::add_service_level(sstring name, service_level_options slo, bool is_static) {
|
||||
return container().invoke_on(global_controller, [=] (service_level_controller &sl_controller) {
|
||||
return with_semaphore(sl_controller._global_controller_db->notifications_serializer, 1, [&sl_controller, name, slo, is_static] () {
|
||||
return sl_controller.do_add_service_level(name, slo, is_static);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
future<> service_level_controller::remove_service_level(sstring name, bool remove_static) {
|
||||
return container().invoke_on(global_controller, [=] (service_level_controller &sl_controller) {
|
||||
return with_semaphore(sl_controller._global_controller_db->notifications_serializer, 1, [&sl_controller, name, remove_static] () {
|
||||
return sl_controller.do_remove_service_level(name, remove_static);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
future<> service_level_controller::start() {
|
||||
if (this_shard_id() != global_controller) {
|
||||
return make_ready_future();
|
||||
}
|
||||
return with_semaphore(_global_controller_db->notifications_serializer, 1, [this] () {
|
||||
return do_add_service_level(default_service_level_name, _global_controller_db->default_service_level_config, true).then([this] () {
|
||||
return container().invoke_on_all([] (service_level_controller& sl) {
|
||||
sl._default_service_level = sl.get_service_level(default_service_level_name);
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
void service_level_controller::set_distributed_data_accessor(service_level_distributed_data_accessor_ptr sl_data_accessor) {
|
||||
// unregistering the accessor is always legal
|
||||
if (!sl_data_accessor) {
|
||||
_sl_data_accessor = nullptr;
|
||||
}
|
||||
|
||||
// Registration of a new accessor can be done only when the _sl_data_accessor is not already set.
|
||||
// This behavior is intended to allow to unit testing debug to set this value without having
|
||||
// overriden by storage_proxy
|
||||
if (!_sl_data_accessor) {
|
||||
_sl_data_accessor = sl_data_accessor;
|
||||
}
|
||||
}
|
||||
|
||||
future<> service_level_controller::stop() {
|
||||
// unregister from the service level distributed data accessor.
|
||||
_sl_data_accessor = nullptr;
|
||||
if (this_shard_id() == global_controller) {
|
||||
// abort the loop of the distributed data checking if it is running
|
||||
_global_controller_db->dist_data_update_aborter.request_abort();
|
||||
_global_controller_db->notifications_serializer.broken();
|
||||
}
|
||||
return make_ready_future();
|
||||
}
|
||||
|
||||
future<> service_level_controller::update_service_levels_from_distributed_data() {
|
||||
|
||||
if (!_sl_data_accessor) {
|
||||
return make_ready_future();
|
||||
}
|
||||
|
||||
return container().invoke_on(global_controller, [] (service_level_controller& sl_controller) {
|
||||
return with_semaphore(sl_controller._global_controller_db->notifications_serializer, 1, [&sl_controller] () {
|
||||
return async([&sl_controller] () {
|
||||
service_levels_info service_levels = sl_controller._sl_data_accessor->get_service_levels().get0();
|
||||
service_levels_info service_levels_for_add_or_update;
|
||||
service_levels_info service_levels_for_delete;
|
||||
|
||||
auto current_it = sl_controller._service_levels_db.begin();
|
||||
auto new_state_it = service_levels.begin();
|
||||
|
||||
// we want to detect 3 kinds of objects in one pass -
|
||||
// 1. new service levels that have been added to the distributed keyspace
|
||||
// 2. existing service levels that have changed
|
||||
// 3. removed service levels
|
||||
// this loop is batching together add/update operation and remove operation
|
||||
// then they are all executed together.The reason for this is to allow for
|
||||
// firstly delete all that there is to be deleted and only then adding new
|
||||
// service levels.
|
||||
while (current_it != sl_controller._service_levels_db.end() && new_state_it != service_levels.end()) {
|
||||
if (current_it->first == new_state_it->first) {
|
||||
//the service level exists on both the cureent and new state.
|
||||
if (current_it->second.slo != new_state_it->second) {
|
||||
// The service level configuration is different
|
||||
// in the new state and the old state, meaning it needs to be updated.
|
||||
service_levels_for_add_or_update.insert(*new_state_it);
|
||||
}
|
||||
current_it++;
|
||||
new_state_it++;
|
||||
} else if (current_it->first < new_state_it->first) {
|
||||
//The service level does not exists in the new state so it needs to be
|
||||
//removed, but only if it is not static since static configurations dont
|
||||
//come from the distributed keyspace but from code.
|
||||
if (!current_it->second.is_static) {
|
||||
service_levels_for_delete.emplace(current_it->first, current_it->second.slo);
|
||||
}
|
||||
current_it++;
|
||||
} else { /*new_it->first < current_it->first */
|
||||
// The service level exits in the new state but not in the old state
|
||||
// so it needs to be added.
|
||||
service_levels_for_add_or_update.insert(*new_state_it);
|
||||
new_state_it++;
|
||||
}
|
||||
}
|
||||
|
||||
for (; current_it != sl_controller._service_levels_db.end(); current_it++) {
|
||||
service_levels_for_delete.emplace(current_it->first, current_it->second.slo);
|
||||
}
|
||||
std::copy(new_state_it, service_levels.end(), std::inserter(service_levels_for_add_or_update,
|
||||
service_levels_for_add_or_update.end()));
|
||||
|
||||
for (auto&& sl : service_levels_for_delete) {
|
||||
sl_controller.do_remove_service_level(sl.first, false).get();
|
||||
}
|
||||
for (auto&& sl : service_levels_for_add_or_update) {
|
||||
sl_controller.do_add_service_level(sl.first, sl.second).get();
|
||||
}
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
future<sstring> service_level_controller::find_service_level(auth::role_set roles) {
|
||||
// FIXME: this arbitrary order based on names is OK for now, because the service levels do not yet
|
||||
// have any parameters. Once they do, this comparison could be based on comparing the parameters.
|
||||
static auto sl_compare = std::less<sstring>();
|
||||
auto& role_manager = _auth_service.local().underlying_role_manager();
|
||||
|
||||
// converts a list of roles into the chosen service level.
|
||||
return ::map_reduce(roles.begin(), roles.end(), [&role_manager, this] (const sstring& role) {
|
||||
return role_manager.get_attribute(role, "service_level").then_wrapped([this, role] (future<std::optional<sstring>> sl_name_fut) {
|
||||
try {
|
||||
std::optional<sstring> sl_name = sl_name_fut.get0();
|
||||
if (! sl_name) {
|
||||
return sl_name;
|
||||
}
|
||||
auto sl_it = _service_levels_db.find(*sl_name);
|
||||
if ( sl_it == _service_levels_db.end()) {
|
||||
return std::optional<sstring>{};
|
||||
} else {
|
||||
return sl_name;
|
||||
}
|
||||
} catch(...) { // when we fail, we act as if the attribute does not exist so the node
|
||||
// will not be brought down.
|
||||
return std::optional<sstring>{};
|
||||
}
|
||||
});
|
||||
}, std::optional<sstring>{}, [this] (std::optional<sstring> first, std::optional<sstring> second) {
|
||||
if (!second) {
|
||||
return first;
|
||||
} else if (!first) {
|
||||
return second;
|
||||
} else {
|
||||
return std::optional<sstring>{ sl_compare(*first, *second) ? second : first };
|
||||
}
|
||||
}).then([] (std::optional<sstring> sl) {
|
||||
return sl? *sl:default_service_level_name;
|
||||
});
|
||||
}
|
||||
|
||||
future<> service_level_controller::notify_service_level_added(sstring name, service_level sl_data) {
|
||||
_service_levels_db.emplace(name, sl_data);
|
||||
return make_ready_future();
|
||||
}
|
||||
|
||||
future<> service_level_controller::notify_service_level_updated(sstring name, service_level_options slo) {
|
||||
auto sl_it = _service_levels_db.find(name);
|
||||
future<> f = make_ready_future();
|
||||
if (sl_it != _service_levels_db.end()) {
|
||||
sl_it->second.slo = slo;
|
||||
}
|
||||
return f;
|
||||
}
|
||||
|
||||
future<> service_level_controller::notify_service_level_removed(sstring name) {
|
||||
auto sl_it = _service_levels_db.find(name);
|
||||
if (sl_it != _service_levels_db.end()) {
|
||||
_service_levels_db.erase(sl_it);
|
||||
}
|
||||
return make_ready_future();
|
||||
}
|
||||
|
||||
void service_level_controller::update_from_distributed_data(std::chrono::duration<float> interval) {
|
||||
(void)container().invoke_on(global_controller, [interval] (service_level_controller& global_sl) {
|
||||
if (global_sl._global_controller_db->distributed_data_update.available()) {
|
||||
global_sl._global_controller_db->distributed_data_update = repeat([interval, &global_sl] {
|
||||
return sleep_abortable<steady_clock_type>(std::chrono::duration_cast<steady_clock_type::duration>(interval),
|
||||
global_sl._global_controller_db->dist_data_update_aborter).then_wrapped([&global_sl] (future<>&& f) {
|
||||
try {
|
||||
f.get();
|
||||
return global_sl.update_service_levels_from_distributed_data().then([] {
|
||||
return stop_iteration::no;
|
||||
});
|
||||
}
|
||||
catch (const sleep_aborted& e) {
|
||||
return make_ready_future<seastar::bool_class<seastar::stop_iteration_tag>>(stop_iteration::yes);
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
future<> service_level_controller::add_distributed_service_level(sstring name, service_level_options slo, bool if_not_exists) {
|
||||
set_service_level_op_type add_type = if_not_exists ? set_service_level_op_type::add_if_not_exists : set_service_level_op_type::add;
|
||||
return set_distributed_service_level(name, slo, add_type);
|
||||
}
|
||||
|
||||
future<> service_level_controller::alter_distributed_service_level(sstring name, service_level_options slo) {
|
||||
return set_distributed_service_level(name, slo, set_service_level_op_type::alter);
|
||||
}
|
||||
|
||||
future<> service_level_controller::drop_distributed_service_level(sstring name, bool if_exists) {
|
||||
return _sl_data_accessor->get_service_levels().then([this, name, if_exists] (qos::service_levels_info sl_info) {
|
||||
auto it = sl_info.find(name);
|
||||
if (it == sl_info.end()) {
|
||||
if (if_exists) {
|
||||
return make_ready_future();
|
||||
} else {
|
||||
return make_exception_future(nonexistant_service_level_exception(name));
|
||||
}
|
||||
} else {
|
||||
auto& role_manager = _auth_service.local().underlying_role_manager();
|
||||
return role_manager.query_attribute_for_all("service_level").then( [&role_manager, name] (auth::role_manager::attribute_vals attributes) {
|
||||
return parallel_for_each(attributes.begin(), attributes.end(), [&role_manager, name] (auto&& attr) {
|
||||
if (attr.second == name) {
|
||||
return role_manager.remove_attribute(attr.first, "service_level");
|
||||
} else {
|
||||
return make_ready_future();
|
||||
}
|
||||
});
|
||||
}).then([this, name] {
|
||||
return _sl_data_accessor->drop_service_level(name);
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
future<service_levels_info> service_level_controller::get_distributed_service_levels() {
|
||||
return _sl_data_accessor->get_service_levels();
|
||||
}
|
||||
|
||||
future<service_levels_info> service_level_controller::get_distributed_service_level(sstring service_level_name) {
|
||||
return _sl_data_accessor->get_service_level(service_level_name);
|
||||
}
|
||||
|
||||
future<> service_level_controller::set_distributed_service_level(sstring name, service_level_options slo, set_service_level_op_type op_type) {
|
||||
return _sl_data_accessor->get_service_levels().then([this, name, slo, op_type] (qos::service_levels_info sl_info) {
|
||||
auto it = sl_info.find(name);
|
||||
// test for illegal requests or requests that should terminate without any action
|
||||
if (it == sl_info.end()) {
|
||||
if (op_type == set_service_level_op_type::alter) {
|
||||
return make_exception_future(exceptions::invalid_request_exception(format("The service level '{}' desn't exist.", name)));
|
||||
}
|
||||
} else {
|
||||
if (op_type == set_service_level_op_type::add) {
|
||||
return make_exception_future(exceptions::invalid_request_exception(format("The service level '{}' already exists.", name)));
|
||||
} else if (op_type == set_service_level_op_type::add_if_not_exists) {
|
||||
return make_ready_future();
|
||||
}
|
||||
}
|
||||
return _sl_data_accessor->set_service_level(name, slo);
|
||||
});
|
||||
}
|
||||
|
||||
future<> service_level_controller::do_add_service_level(sstring name, service_level_options slo, bool is_static) {
|
||||
auto service_level_it = _service_levels_db.find(name);
|
||||
if (is_static) {
|
||||
_global_controller_db->static_configurations[name] = slo;
|
||||
}
|
||||
if (service_level_it != _service_levels_db.end()) {
|
||||
if ((is_static && service_level_it->second.is_static) || !is_static) {
|
||||
if ((service_level_it->second.is_static) && (!is_static)) {
|
||||
service_level_it->second.is_static = false;
|
||||
}
|
||||
return container().invoke_on_all(&service_level_controller::notify_service_level_updated, name, slo);
|
||||
} else {
|
||||
// this means we set static layer when the the service level
|
||||
// is running of the non static configuration. so we have nothing
|
||||
// else to do since we already saved the static configuration.
|
||||
return make_ready_future();
|
||||
}
|
||||
} else {
|
||||
return do_with(service_level{.slo = slo, .is_static = is_static}, std::move(name), [this] (service_level& sl, sstring& name) {
|
||||
return container().invoke_on_all(&service_level_controller::notify_service_level_added, name, sl);
|
||||
});
|
||||
}
|
||||
return make_ready_future();
|
||||
}
|
||||
|
||||
future<> service_level_controller::do_remove_service_level(sstring name, bool remove_static) {
|
||||
auto service_level_it = _service_levels_db.find(name);
|
||||
if (service_level_it != _service_levels_db.end()) {
|
||||
auto static_conf_it = _global_controller_db->static_configurations.end();
|
||||
bool static_exists = false;
|
||||
if (remove_static) {
|
||||
_global_controller_db->static_configurations.erase(name);
|
||||
} else {
|
||||
static_conf_it = _global_controller_db->static_configurations.find(name);
|
||||
static_exists = static_conf_it != _global_controller_db->static_configurations.end();
|
||||
}
|
||||
if (remove_static && service_level_it->second.is_static) {
|
||||
return container().invoke_on_all(&service_level_controller::notify_service_level_removed, name);
|
||||
} else if (!remove_static && !service_level_it->second.is_static) {
|
||||
if (static_exists) {
|
||||
service_level_it->second.is_static = true;
|
||||
return container().invoke_on_all(&service_level_controller::notify_service_level_updated, name, static_conf_it->second);
|
||||
} else {
|
||||
return container().invoke_on_all(&service_level_controller::notify_service_level_removed, name);
|
||||
}
|
||||
}
|
||||
}
|
||||
return make_ready_future();
|
||||
}
|
||||
|
||||
}
|
||||
241
service/qos/service_level_controller.hh
Normal file
241
service/qos/service_level_controller.hh
Normal file
@@ -0,0 +1,241 @@
|
||||
/*
|
||||
* Copyright (C) 2020 ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* This file is part of Scylla.
|
||||
*
|
||||
* Scylla is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Scylla is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "seastarx.hh"
|
||||
#include "log.hh"
|
||||
#include "auth/role_manager.hh"
|
||||
#include "auth/authenticated_user.hh"
|
||||
#include <seastar/core/sstring.hh>
|
||||
#include <seastar/core/distributed.hh>
|
||||
#include "auth/service.hh"
|
||||
#include "service/storage_service.hh"
|
||||
#include <map>
|
||||
#include <unordered_set>
|
||||
#include "qos_common.hh"
|
||||
|
||||
namespace db {
|
||||
class system_distributed_keyspace;
|
||||
}
|
||||
namespace qos {
|
||||
/**
|
||||
* a structure to hold a service level
|
||||
* data and configuration.
|
||||
*/
|
||||
struct service_level {
|
||||
service_level_options slo;
|
||||
bool marked_for_deletion;
|
||||
bool is_static;
|
||||
};
|
||||
|
||||
/**
|
||||
* The service_level_controller class is an implementation of the service level
|
||||
* controller design.
|
||||
* It is logically divided into 2 parts:
|
||||
* 1. Global controller which is responsible for all of the data and plumbing
|
||||
* manipulation.
|
||||
* 2. Local controllers that act upon the data and facilitates execution in
|
||||
* the service level context
|
||||
*/
|
||||
class service_level_controller : public peering_sharded_service<service_level_controller> {
|
||||
public:
|
||||
class service_level_distributed_data_accessor {
|
||||
public:
|
||||
virtual future<qos::service_levels_info> get_service_levels() const = 0;
|
||||
virtual future<qos::service_levels_info> get_service_level(sstring service_level_name) const = 0;
|
||||
virtual future<> set_service_level(sstring service_level_name, qos::service_level_options slo) const = 0;
|
||||
virtual future<> drop_service_level(sstring service_level_name) const = 0;
|
||||
};
|
||||
using service_level_distributed_data_accessor_ptr = ::shared_ptr<service_level_distributed_data_accessor>;
|
||||
private:
|
||||
struct global_controller_data {
|
||||
service_levels_info static_configurations{};
|
||||
int schedg_group_cnt = 0;
|
||||
int io_priority_cnt = 0;
|
||||
service_level_options default_service_level_config;
|
||||
// The below future is used to serialize work so no reordering can occur.
|
||||
// This is needed so for example: delete(x), add(x) will not reverse yielding
|
||||
// a completely different result than the one intended.
|
||||
future<> work_future = make_ready_future();
|
||||
semaphore notifications_serializer = semaphore(1);
|
||||
future<> distributed_data_update = make_ready_future();
|
||||
abort_source dist_data_update_aborter;
|
||||
};
|
||||
|
||||
std::unique_ptr<global_controller_data> _global_controller_db;
|
||||
|
||||
static constexpr shard_id global_controller = 0;
|
||||
|
||||
std::unordered_map<sstring, service_level> _service_levels_db;
|
||||
std::unordered_map<sstring, sstring> _role_to_service_level;
|
||||
service_level _default_service_level;
|
||||
service_level_distributed_data_accessor_ptr _sl_data_accessor;
|
||||
sharded<auth::service>& _auth_service;
|
||||
public:
|
||||
service_level_controller(sharded<auth::service>& auth_service, service_level_options default_service_level_config);
|
||||
|
||||
/**
|
||||
* this function must be called *once* from any shard before any other functions are called.
|
||||
* No other function should be called before the future returned by the function is resolved.
|
||||
* @return a future that resolves when the initialization is over.
|
||||
*/
|
||||
future<> start();
|
||||
|
||||
void set_distributed_data_accessor(service_level_distributed_data_accessor_ptr sl_data_accessor);
|
||||
|
||||
/**
|
||||
* Adds a service level configuration if it doesn't exists, and updates
|
||||
* an the existing one if it does exist.
|
||||
* Handles both, static and non static service level configurations.
|
||||
* @param name - the service level name.
|
||||
* @param slo - the service level configuration
|
||||
* @param is_static - is this configuration static or not
|
||||
*/
|
||||
future<> add_service_level(sstring name, service_level_options slo, bool is_static = false);
|
||||
|
||||
/**
|
||||
* Removes a service level configuration if it exists.
|
||||
* Handles both, static and non static service level configurations.
|
||||
* @param name - the service level name.
|
||||
* @param remove_static - indicates if it is a removal of a static configuration
|
||||
* or not.
|
||||
*/
|
||||
future<> remove_service_level(sstring name, bool remove_static);
|
||||
|
||||
/**
|
||||
* stops all ongoing operations if exists
|
||||
* @return a future that is resolved when all operations has stopped
|
||||
*/
|
||||
future<> stop();
|
||||
|
||||
/**
|
||||
* this is an executor of a function with arguments under a service level
|
||||
* that corresponds to a given user.
|
||||
* @param usr - the user for determining the service level
|
||||
* @param func - the function to be executed
|
||||
* @return a future that is resolved when the function's operation is resolved
|
||||
* (if it returns a future). or a ready future containing the returned value
|
||||
* from the function/
|
||||
*/
|
||||
template <typename Ret>
|
||||
futurize_t<Ret> with_user_service_level(shared_ptr<auth::authenticated_user> usr, noncopyable_function<Ret()> func) {
|
||||
if (usr) {
|
||||
auth::service& ser = _auth_service.local();
|
||||
return auth::get_roles(ser, *usr).then([this] (auth::role_set roles) {
|
||||
return find_service_level(roles);
|
||||
}).then([usr, this, func = std::move(func)] (sstring service_level_name) mutable {
|
||||
return with_service_level(service_level_name, std::move(func));
|
||||
});
|
||||
} else {
|
||||
return with_service_level(default_service_level_name, std::move(func));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Chack the distributed data for changes in a constant interval and updates
|
||||
* the service_levels configuration in accordance (adds, removes, or updates
|
||||
* service levels as necessairy).
|
||||
* @param interval - the interval is seconds to check the distributed data.
|
||||
* @return a future that is resolved when the update loop stops.
|
||||
*/
|
||||
void update_from_distributed_data(std::chrono::duration<float> interval);
|
||||
|
||||
/**
|
||||
* Updates the service level data from the distributed data store.
|
||||
* @return a future that is resolved when the update is done
|
||||
*/
|
||||
future<> update_service_levels_from_distributed_data();
|
||||
|
||||
|
||||
future<> add_distributed_service_level(sstring name, service_level_options slo, bool if_not_exsists);
|
||||
future<> alter_distributed_service_level(sstring name, service_level_options slo);
|
||||
future<> drop_distributed_service_level(sstring name, bool if_exists);
|
||||
future<service_levels_info> get_distributed_service_levels();
|
||||
future<service_levels_info> get_distributed_service_level(sstring service_level_name);
|
||||
|
||||
private:
|
||||
/**
|
||||
* Adds a service level configuration if it doesn't exists, and updates
|
||||
* an the existing one if it does exist.
|
||||
* Handles both, static and non static service level configurations.
|
||||
* @param name - the service level name.
|
||||
* @param slo - the service level configuration
|
||||
* @param is_static - is this configuration static or not
|
||||
*/
|
||||
future<> do_add_service_level(sstring name, service_level_options slo, bool is_static = false);
|
||||
|
||||
/**
|
||||
* Removes a service level configuration if it exists.
|
||||
* Handles both, static and non static service level configurations.
|
||||
* @param name - the service level name.
|
||||
* @param remove_static - indicates if it is a removal of a static configuration
|
||||
* or not.
|
||||
*/
|
||||
future<> do_remove_service_level(sstring name, bool remove_static);
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* Returns the service level **in effect** for a user having the given
|
||||
* collection of roles.
|
||||
* @param roles - the collection of roles to consider
|
||||
* @return the name of the service level in effect.
|
||||
*/
|
||||
future<sstring> find_service_level(auth::role_set roles);
|
||||
|
||||
/**
|
||||
* The notify functions are used by the global service level controller
|
||||
* to propagate configuration changes to the local controllers.
|
||||
* the returned future is resolved when the local controller is done acting
|
||||
* on the notification. updates are done in sequence so their meaning will not
|
||||
* change due to execution reordering.
|
||||
*/
|
||||
future<> notify_service_level_added(sstring name, service_level sl_data);
|
||||
future<> notify_service_level_updated(sstring name, service_level_options slo);
|
||||
future<> notify_service_level_removed(sstring name);
|
||||
|
||||
/**
|
||||
* Gets the service level data by name.
|
||||
* @param service_level_name - the name of the requested service level
|
||||
* @return the service level data if it exists (in the local controller) or
|
||||
* get_service_level("default") otherwise.
|
||||
*/
|
||||
service_level& get_service_level(sstring service_level_name) {
|
||||
auto sl_it = _service_levels_db.find(service_level_name);
|
||||
if (sl_it == _service_levels_db.end() || sl_it->second.marked_for_deletion) {
|
||||
sl_it = _service_levels_db.find(default_service_level_name);
|
||||
}
|
||||
return sl_it->second;
|
||||
}
|
||||
|
||||
enum class set_service_level_op_type {
|
||||
add_if_not_exists,
|
||||
add,
|
||||
alter
|
||||
};
|
||||
|
||||
future<> set_distributed_service_level(sstring name, service_level_options slo, set_service_level_op_type op_type);
|
||||
|
||||
public:
|
||||
static sstring default_service_level_name;
|
||||
};
|
||||
}
|
||||
Reference in New Issue
Block a user