Before b59b3d4 the migration code checked that service level controller
is on v2 version before migration and the check also implicitly checked
that _sl_data_accessor field is already initialized, but now that the
check is gone the migration can start before service level controller is
fully initialized. Re add the check, but to a different place.
Fixes https://scylladb.atlassian.net/browse/SCYLLADB-1049
Closes scylladb/scylladb#29021
985 lines
46 KiB
C++
985 lines
46 KiB
C++
/*
|
|
* Copyright (C) 2020-present ScyllaDB
|
|
*/
|
|
|
|
/*
|
|
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
*/
|
|
|
|
#include "cql3/util.hh"
|
|
#include "utils/assert.hh"
|
|
#include <chrono>
|
|
|
|
#include <seastar/core/sleep.hh>
|
|
#include <seastar/core/thread.hh>
|
|
#include <seastar/coroutine/parallel_for_each.hh>
|
|
#include <seastar/coroutine/as_future.hh>
|
|
#include "cql3/untyped_result_set.hh"
|
|
#include "db/config.hh"
|
|
#include "db/consistency_level_type.hh"
|
|
#include "db/system_keyspace.hh"
|
|
#include <seastar/core/on_internal_error.hh>
|
|
#include <seastar/core/timer.hh>
|
|
#include <seastar/core/future.hh>
|
|
#include <seastar/core/semaphore.hh>
|
|
#include <seastar/core/shard_id.hh>
|
|
#include <seastar/coroutine/maybe_yield.hh>
|
|
#include "service/qos/raft_service_level_distributed_data_accessor.hh"
|
|
#include "service_level_controller.hh"
|
|
#include "db/system_distributed_keyspace.hh"
|
|
#include "cql3/query_processor.hh"
|
|
#include "service/storage_service.hh"
|
|
#include "service/topology_state_machine.hh"
|
|
#include "utils/sorting.hh"
|
|
#include <seastar/core/reactor.hh>
|
|
#include "utils/managed_string.hh"
|
|
|
|
namespace qos {
|
|
static logging::logger sl_logger("service_level_controller");
|
|
|
|
sstring service_level_controller::default_service_level_name = "default";
|
|
constexpr const char* scheduling_group_name_pattern = "sl:{}";
|
|
constexpr const char* deleted_scheduling_group_name_pattern = "sl_deleted:{}";
|
|
constexpr const char* temp_scheduling_group_name_pattern = "sl_temp:{}";
|
|
|
|
service_level_controller::auth_integration::auth_integration(service_level_controller& sl_controller, auth::service& auth_service)
|
|
: _sl_controller(sl_controller)
|
|
, _auth_service(auth_service)
|
|
, _stop_gate("service_level_controller_auth_integration_stop_gate")
|
|
{}
|
|
|
|
future<> service_level_controller::auth_integration::stop() {
|
|
co_await _stop_gate.close();
|
|
}
|
|
|
|
void service_level_controller::auth_integration::clear_cache() {
|
|
_cache.clear();
|
|
}
|
|
|
|
service_level_controller::service_level_controller(sharded<auth::service>& auth_service, locator::shared_token_metadata& tm, abort_source& as, service_level_options default_service_level_config,
|
|
scheduling_supergroup user_ssg, scheduling_group default_scheduling_group, bool destroy_default_sg_on_drain)
|
|
: _sl_data_accessor(nullptr)
|
|
, _auth_service(auth_service)
|
|
, _token_metadata(tm)
|
|
, _last_successful_config_update(seastar::lowres_clock::now())
|
|
, _logged_intervals(0)
|
|
, _early_abort_subscription(as.subscribe([this] () noexcept { do_abort(); }))
|
|
{
|
|
// We can't rename the system default scheduling group so we have to reject it.
|
|
assert(default_scheduling_group != get_default_scheduling_group());
|
|
if (this_shard_id() == global_controller) {
|
|
_global_controller_db = std::make_unique<global_controller_data>();
|
|
_global_controller_db->default_service_level_config = default_service_level_config;
|
|
_global_controller_db->user_ssg = user_ssg;
|
|
_global_controller_db->default_sg = default_scheduling_group;
|
|
_global_controller_db->destroy_default_sg = destroy_default_sg_on_drain;
|
|
// since the first thing that is being done is adding the default service level, we only
|
|
// need to throw the given group to the pool of scheduling groups for reuse.
|
|
_global_controller_db->deleted_scheduling_groups.emplace_back(default_scheduling_group);
|
|
}
|
|
}
|
|
|
|
future<> service_level_controller::add_service_level(sstring name, service_level_options slo, bool is_static) {
|
|
return container().invoke_on(global_controller, [=] (service_level_controller &sl_controller) {
|
|
return with_semaphore(sl_controller._global_controller_db->notifications_serializer, 1, [&sl_controller, name, slo, is_static] () {
|
|
return sl_controller.do_add_service_level(name, slo, is_static);
|
|
});
|
|
});
|
|
}
|
|
|
|
future<> service_level_controller::remove_service_level(sstring name, bool remove_static) {
|
|
return container().invoke_on(global_controller, [=] (service_level_controller &sl_controller) {
|
|
return with_semaphore(sl_controller._global_controller_db->notifications_serializer, 1, [&sl_controller, name, remove_static] () {
|
|
return sl_controller.do_remove_service_level(name, remove_static);
|
|
});
|
|
});
|
|
}
|
|
|
|
future<> service_level_controller::start() {
|
|
if (this_shard_id() != global_controller) {
|
|
return make_ready_future();
|
|
}
|
|
return with_semaphore(_global_controller_db->notifications_serializer, 1, [this] () {
|
|
return do_add_service_level(default_service_level_name, _global_controller_db->default_service_level_config, true).then([this] () {
|
|
return container().invoke_on_all([] (service_level_controller& sl) {
|
|
sl._default_service_level = sl.get_service_level(default_service_level_name);
|
|
});
|
|
});
|
|
});
|
|
}
|
|
|
|
|
|
void service_level_controller::set_distributed_data_accessor(service_level_distributed_data_accessor_ptr sl_data_accessor) {
|
|
// unregistering the accessor is always legal
|
|
if (!sl_data_accessor) {
|
|
_sl_data_accessor = nullptr;
|
|
}
|
|
|
|
// Registration of a new accessor can be done only when the _sl_data_accessor is not already set.
|
|
// This behavior is intended to allow to unit testing debug to set this value without having
|
|
// overridden by storage_proxy
|
|
if (!_sl_data_accessor) {
|
|
_sl_data_accessor = sl_data_accessor;
|
|
}
|
|
}
|
|
|
|
void service_level_controller::reload_distributed_data_accessor(cql3::query_processor& qp, service::raft_group0_client& g0) {
|
|
auto accessor = static_pointer_cast<qos::service_level_controller::service_level_distributed_data_accessor>(
|
|
make_shared<qos::raft_service_level_distributed_data_accessor>(qp, g0));
|
|
set_distributed_data_accessor(std::move(accessor));
|
|
}
|
|
|
|
void service_level_controller::do_abort() noexcept {
|
|
if (this_shard_id() != global_controller) {
|
|
return;
|
|
}
|
|
|
|
// abort the loop of the distributed data checking if it is running
|
|
if (!_global_controller_db->dist_data_update_aborter.abort_requested()) {
|
|
_global_controller_db->dist_data_update_aborter.request_abort();
|
|
}
|
|
|
|
abort_group0_operations();
|
|
}
|
|
|
|
future<> service_level_controller::stop() {
|
|
if (this_shard_id() != global_controller) {
|
|
co_return;
|
|
}
|
|
|
|
// If abort source didn't fire, do it now
|
|
_early_abort_subscription->on_abort(std::nullopt);
|
|
|
|
_global_controller_db->notifications_serializer.broken();
|
|
try {
|
|
auto f = co_await coroutine::as_future(std::exchange(_global_controller_db->distributed_data_update, make_ready_future<>()));
|
|
// delete all sg's in _service_levels_db, leaving it empty.
|
|
for (auto it = _service_levels_db.begin(); it != _service_levels_db.end(); ) {
|
|
_global_controller_db->deleted_scheduling_groups.emplace_back(it->second.sg);
|
|
it = _service_levels_db.erase(it);
|
|
}
|
|
f.get();
|
|
} catch (const broken_semaphore& ignored) {
|
|
} catch (const sleep_aborted& ignored) {
|
|
} catch (const exceptions::unavailable_exception& ignored) {
|
|
} catch (const exceptions::read_timeout_exception& ignored) {
|
|
}
|
|
|
|
// exclude scheduling groups we shouldn't destroy
|
|
std::erase_if(_global_controller_db->deleted_scheduling_groups, [this] (scheduling_group& sg) {
|
|
if (sg == default_scheduling_group()) {
|
|
return true;
|
|
} else if (!_global_controller_db->destroy_default_sg && _global_controller_db->default_sg == sg) {
|
|
return true;
|
|
} else {
|
|
return false;
|
|
}
|
|
});
|
|
|
|
// destroy all sg's in _global_controller_db->deleted_scheduling_groups, leaving it empty
|
|
// if any destroy_scheduling_group call fails, return one of the exceptions
|
|
std::deque<scheduling_group> deleted_scheduling_groups = std::move(_global_controller_db->deleted_scheduling_groups);
|
|
std::exception_ptr ex;
|
|
|
|
while (!deleted_scheduling_groups.empty()) {
|
|
auto f = co_await coroutine::as_future(destroy_scheduling_group(deleted_scheduling_groups.front()));
|
|
if (f.failed()) {
|
|
auto e = f.get_exception();
|
|
sl_logger.error("Destroying scheduling group \"{}\" on stop failed: {}. Ignored.", deleted_scheduling_groups.front().name(), e);
|
|
ex = std::move(e);
|
|
}
|
|
deleted_scheduling_groups.pop_front();
|
|
}
|
|
if (ex) {
|
|
std::rethrow_exception(std::move(ex));
|
|
}
|
|
}
|
|
|
|
void service_level_controller::abort_group0_operations() {
|
|
// abort group0 operations
|
|
if (!_global_controller_db->group0_aborter.abort_requested()) {
|
|
_global_controller_db->group0_aborter.request_abort();
|
|
}
|
|
}
|
|
|
|
future<> service_level_controller::update_service_levels_cache(qos::query_context ctx) {
|
|
SCYLLA_ASSERT(this_shard_id() == global_controller);
|
|
|
|
if (!_sl_data_accessor) {
|
|
return make_ready_future();
|
|
}
|
|
|
|
return with_semaphore(_global_controller_db->notifications_serializer, 1, [this, ctx] () {
|
|
return async([this, ctx] () {
|
|
service_levels_info service_levels;
|
|
// The next statement can throw, but that's fine since we would like the caller
|
|
// to be able to aggregate those failures and only report when it is critical or noteworthy.
|
|
// one common reason for failure is because one of the nodes comes down and before this node
|
|
// detects it the scan query done inside this call is failing.
|
|
service_levels = _sl_data_accessor->get_service_levels(ctx).get();
|
|
|
|
service_levels_info service_levels_for_update;
|
|
service_levels_info service_levels_for_add;
|
|
service_levels_info service_levels_for_delete;
|
|
|
|
auto current_it = _service_levels_db.begin();
|
|
auto new_state_it = service_levels.begin();
|
|
|
|
// we want to detect 3 kinds of objects in one pass -
|
|
// 1. new service levels that have been added to the distributed keyspace
|
|
// 2. existing service levels that have changed
|
|
// 3. removed service levels
|
|
// this loop is batching together add/update operation and remove operation
|
|
// then they are all executed together.The reason for this is to allow for
|
|
// firstly delete all that there is to be deleted and only then adding new
|
|
// service levels.
|
|
while (current_it != _service_levels_db.end() && new_state_it != service_levels.end()) {
|
|
if (current_it->first.starts_with('$')) {
|
|
sl_logger.warn("Service level names starting with '$' are reserved for internal tenants. Rename service level \"{}\" to drop '$' prefix.", current_it->first.c_str());
|
|
}
|
|
|
|
if (current_it->first == new_state_it->first) {
|
|
//the service level exists on both the cureent and new state.
|
|
if (current_it->second.slo != new_state_it->second) {
|
|
// The service level configuration is different
|
|
// in the new state and the old state, meaning it needs to be updated.
|
|
service_levels_for_update.insert(*new_state_it);
|
|
}
|
|
current_it++;
|
|
new_state_it++;
|
|
} else if (current_it->first < new_state_it->first) {
|
|
//The service level does not exists in the new state so it needs to be
|
|
//removed, but only if it is not static since static configurations dont
|
|
//come from the distributed keyspace but from code.
|
|
if (!current_it->second.is_static) {
|
|
service_levels_for_delete.emplace(current_it->first, current_it->second.slo);
|
|
}
|
|
current_it++;
|
|
} else { /*new_it->first < current_it->first */
|
|
// The service level exits in the new state but not in the old state
|
|
// so it needs to be added.
|
|
service_levels_for_add.insert(*new_state_it);
|
|
new_state_it++;
|
|
}
|
|
}
|
|
|
|
for (; current_it != _service_levels_db.end(); current_it++) {
|
|
if (!current_it->second.is_static) {
|
|
service_levels_for_delete.emplace(current_it->first, current_it->second.slo);
|
|
}
|
|
}
|
|
for (; new_state_it != service_levels.end(); new_state_it++) {
|
|
service_levels_for_add.emplace(new_state_it->first, new_state_it->second);
|
|
}
|
|
|
|
for (auto&& sl : service_levels_for_delete) {
|
|
do_remove_service_level(sl.first, false).get();
|
|
sl_logger.info("service level \"{}\" was deleted.", sl.first.c_str());
|
|
}
|
|
for (auto&& sl : service_levels_for_update) {
|
|
do_add_service_level(sl.first, sl.second).get();
|
|
sl_logger.info("service level \"{}\" was updated. New values: (timeout: {}, workload_type: {}, shares: {})",
|
|
sl.first, sl.second.timeout, sl.second.workload, sl.second.shares);
|
|
}
|
|
|
|
if (_auth_integration) {
|
|
_auth_integration->clear_cache();
|
|
}
|
|
|
|
for (auto&& sl : service_levels_for_add) {
|
|
bool make_room = false;
|
|
std::map<sstring, service_level>::reverse_iterator it;
|
|
try {
|
|
do_add_service_level(sl.first, sl.second).get();
|
|
sl_logger.info("service level \"{}\" was added.", sl.first.c_str());
|
|
} catch (service_level_scheduling_groups_exhausted &ex) {
|
|
it = _service_levels_db.rbegin();
|
|
if (it->first == default_service_level_name) {
|
|
it++;
|
|
}
|
|
if (it->first.compare(sl.first) > 0) {
|
|
make_room = true;
|
|
} else {
|
|
_effectively_dropped_sls.insert(sl.first);
|
|
sl_logger.warn("{}", ex.what());
|
|
}
|
|
}
|
|
if (make_room) {
|
|
sl_logger.warn("service level \"{}\" will be effectively dropped to make scheduling group available to \"{}\", please consider removing a service level."
|
|
, it->first, sl.first );
|
|
do_remove_service_level(it->first, false).get();
|
|
_effectively_dropped_sls.insert(it->first);
|
|
do_add_service_level(sl.first, sl.second).get();
|
|
}
|
|
}
|
|
|
|
});
|
|
});
|
|
}
|
|
|
|
future<> service_level_controller::auth_integration::reload_cache(qos::query_context ctx) {
|
|
SCYLLA_ASSERT(this_shard_id() == global_controller);
|
|
const auto _ = _stop_gate.hold();
|
|
|
|
auto units = co_await get_units(_sl_controller._global_controller_db->notifications_serializer, 1);
|
|
|
|
auto& qs = qos_query_state(ctx);
|
|
auto& role_manager = _auth_service.underlying_role_manager();
|
|
const auto all_roles = co_await role_manager.query_all(qs);
|
|
const auto hierarchy = co_await role_manager.query_all_directly_granted(qs);
|
|
// includes only roles with attached service level
|
|
const auto attributes = co_await role_manager.query_attribute_for_all("service_level", qs);
|
|
|
|
std::map<sstring, service_level_options> effective_sl_map;
|
|
|
|
auto sorted = co_await utils::topological_sort(all_roles, hierarchy);
|
|
// Roles are sorted from the top of the hierarchy to the bottom.
|
|
/// `GRANT role1 TO role2` means role2 is higher in the hierarchy than role1, so role2 will be before
|
|
// role1 in `sorted` vector.
|
|
// That's why if we iterate over the vector in reversed order, we will visit the roles from the bottom
|
|
// and we can use already calculated effective service levels for all of the subroles.
|
|
for (auto& role: sorted | std::views::reverse) {
|
|
std::optional<service_level_options> sl_options;
|
|
|
|
if (auto sl_name_it = attributes.find(role); sl_name_it != attributes.end()) {
|
|
if (auto sl_it = _sl_controller._service_levels_db.find(sl_name_it->second); sl_it != _sl_controller._service_levels_db.end()) {
|
|
sl_options = sl_it->second.slo;
|
|
sl_options->init_effective_names(sl_name_it->second);
|
|
sl_options->shares_name = sl_name_it->second;
|
|
} else if (_sl_controller._effectively_dropped_sls.contains(sl_name_it->second)) {
|
|
// service level might be effective dropped, then it's not present in `_service_levels_db`
|
|
sl_logger.warn("Service level {} is effectively dropped and its values are ignored.", sl_name_it->second);
|
|
} else {
|
|
sl_logger.error("Couldn't find service level {} in first level cache", sl_name_it->second);
|
|
}
|
|
}
|
|
|
|
auto [it, it_end] = hierarchy.equal_range(role);
|
|
while (it != it_end) {
|
|
auto& subrole = it->second;
|
|
if (auto sub_sl_it = effective_sl_map.find(subrole); sub_sl_it != effective_sl_map.end()) {
|
|
if (sl_options) {
|
|
sl_options = sl_options->merge_with(sub_sl_it->second);
|
|
} else {
|
|
sl_options = sub_sl_it->second;
|
|
}
|
|
}
|
|
|
|
++it;
|
|
}
|
|
|
|
if (sl_options) {
|
|
effective_sl_map.insert({role, *sl_options});
|
|
}
|
|
co_await coroutine::maybe_yield();
|
|
}
|
|
|
|
co_await _sl_controller.container().invoke_on_all([effective_sl_map] (service_level_controller& sl_controller) -> future<> {
|
|
// We probably cannot predict if `auth_integration` is still in place on another shard,
|
|
// so let's play it safe here.
|
|
if (sl_controller._auth_integration) {
|
|
sl_controller._auth_integration->_cache = std::move(effective_sl_map);
|
|
}
|
|
co_await sl_controller.notify_effective_service_levels_cache_reloaded();
|
|
});
|
|
}
|
|
|
|
future<> service_level_controller::update_cache(update_both_cache_levels update_both_cache_levels, qos::query_context ctx) {
|
|
SCYLLA_ASSERT(this_shard_id() == global_controller);
|
|
if (update_both_cache_levels) {
|
|
co_await update_service_levels_cache(ctx);
|
|
}
|
|
|
|
if (_auth_integration) {
|
|
co_await _auth_integration->reload_cache(ctx);
|
|
}
|
|
}
|
|
|
|
static service_level_options get_driver_service_level_slo() {
|
|
service_level_options slo;
|
|
slo.shares = 200;
|
|
slo.workload = service_level_options::workload_type::batch;
|
|
return slo;
|
|
}
|
|
|
|
future<utils::chunked_vector<mutation>> service_level_controller::get_create_driver_service_level_mutations(db::system_keyspace& sys_ks, api::timestamp_type timestamp) {
|
|
|
|
utils::chunked_vector<mutation> mutations;
|
|
|
|
auto sl_mutations = co_await raft_service_level_distributed_data_accessor::set_service_level_mutations(sys_ks.query_processor(), service_level_controller::driver_service_level_name, get_driver_service_level_slo(), timestamp);
|
|
std::move(sl_mutations.begin(), sl_mutations.end(), std::back_inserter(mutations));
|
|
|
|
auto sys_ks_mutation = co_await sys_ks.make_service_level_driver_created_mutation(true, timestamp);
|
|
mutations.push_back(std::move(sys_ks_mutation));
|
|
|
|
co_return mutations;
|
|
}
|
|
|
|
future<std::optional<service::group0_guard>> service_level_controller::migrate_to_driver_service_level(service::group0_guard guard, db::system_keyspace& sys_ks) {
|
|
// Don't try creating driver service level too often if it already failed.
|
|
// We don't want to block the topology coordinator.
|
|
if (_sl_data_accessor && _last_unsuccessful_driver_sl_creation_attemp + 5min < seastar::lowres_clock::now()) {
|
|
sl_logger.info("migrate_to_driver_service_level: starting sl:{} creation", service_level_controller::driver_service_level_name);
|
|
try {
|
|
service::group0_batch mc{std::move(guard)};
|
|
|
|
constexpr bool if_not_exists = true;
|
|
co_await add_distributed_service_level(service_level_controller::driver_service_level_name, get_driver_service_level_slo(), if_not_exists, mc);
|
|
|
|
auto sys_ks_mutation = co_await sys_ks.make_service_level_driver_created_mutation(true, mc.write_timestamp());
|
|
mc.add_mutation(std::move(sys_ks_mutation), "set service_level_driver_created=true");
|
|
|
|
co_await commit_mutations(std::move(mc));
|
|
sl_logger.info("create_driver_service_level: sl:{} created", service_level_controller::driver_service_level_name);
|
|
} catch (service::group0_concurrent_modification&) {
|
|
throw; // Let caller handle `group0_concurrent_modification`
|
|
} catch (...) {
|
|
sl_logger.error("Failed to create service level for driver: {}. Removal of user service levels below the limit is necessary to allow sl:driver creation.", std::current_exception());
|
|
_last_unsuccessful_driver_sl_creation_attemp = seastar::lowres_clock::now();
|
|
}
|
|
co_return std::nullopt;
|
|
}
|
|
co_return std::move(guard); // return guard untouched
|
|
}
|
|
|
|
|
|
std::optional<service_level_options> service_level_controller::auth_integration::find_cached_effective_service_level(const sstring& role_name) {
|
|
auto effective_sl_it = _cache.find(role_name);
|
|
return effective_sl_it != _cache.end()
|
|
? std::optional<service_level_options>(effective_sl_it->second)
|
|
: std::nullopt;
|
|
}
|
|
|
|
std::optional<service_level_options> service_level_controller::find_cached_effective_service_level(const sstring& role_name) {
|
|
SCYLLA_ASSERT(_auth_integration != nullptr);
|
|
return _auth_integration->find_cached_effective_service_level(role_name);
|
|
}
|
|
|
|
future<> service_level_controller::notify_service_level_added(sstring name, service_level sl_data) {
|
|
return seastar::async( [this, name, sl_data] {
|
|
service_level_info sl_info = {
|
|
.name = name,
|
|
.sg = sl_data.sg,
|
|
};
|
|
_subscribers.thread_for_each([name, sl_data, sl_info] (qos_configuration_change_subscriber* subscriber) {
|
|
try {
|
|
subscriber->on_before_service_level_add(sl_data.slo, sl_info).get();
|
|
} catch (...) {
|
|
sl_logger.error("notify_service_level_added: exception occurred in one of the observers callbacks {}", std::current_exception());
|
|
}
|
|
});
|
|
auto result= _service_levels_db.emplace(name, sl_data);
|
|
if (result.second) {
|
|
unsigned sl_idx = internal::scheduling_group_index(sl_data.sg);
|
|
_sl_lookup[sl_idx].first = &(result.first->first);
|
|
_sl_lookup[sl_idx].second = &(result.first->second);
|
|
}
|
|
});
|
|
|
|
}
|
|
|
|
future<> service_level_controller::notify_service_level_updated(sstring name, service_level_options slo) {
|
|
auto sl_it = _service_levels_db.find(name);
|
|
future<> f = make_ready_future();
|
|
if (sl_it != _service_levels_db.end()) {
|
|
service_level_options slo_before = sl_it->second.slo;
|
|
return seastar::async( [this,sl_it, name, slo_before, slo] {
|
|
future<> f = make_ready_future();
|
|
service_level_info sl_info = {
|
|
.name = name,
|
|
.sg = sl_it->second.sg,
|
|
};
|
|
_subscribers.thread_for_each([name, slo_before, slo, sl_info] (qos_configuration_change_subscriber* subscriber) {
|
|
try {
|
|
subscriber->on_before_service_level_change(slo_before, slo, sl_info).get();
|
|
} catch (...) {
|
|
sl_logger.error("notify_service_level_updated: exception occurred in one of the observers callbacks {}", std::current_exception());
|
|
}
|
|
});
|
|
if (sl_it->second.slo.shares != slo.shares) {
|
|
int32_t new_shares = default_shares;
|
|
if (auto new_shares_p = std::get_if<int32_t>(&slo.shares)) {
|
|
new_shares = *new_shares_p;
|
|
}
|
|
sl_it->second.sg.set_shares(new_shares);
|
|
}
|
|
|
|
sl_it->second.slo = slo;
|
|
});
|
|
}
|
|
return f;
|
|
}
|
|
|
|
future<> service_level_controller::notify_service_level_removed(sstring name) {
|
|
auto sl_it = _service_levels_db.find(name);
|
|
if (sl_it != _service_levels_db.end()) {
|
|
unsigned sl_idx = internal::scheduling_group_index(sl_it->second.sg);
|
|
_sl_lookup[sl_idx].first = nullptr;
|
|
_sl_lookup[sl_idx].second = nullptr;
|
|
if (this_shard_id() == global_controller) {
|
|
_global_controller_db->deleted_scheduling_groups.emplace_back(sl_it->second.sg);
|
|
co_await rename_scheduling_group(sl_it->second.sg, seastar::format(deleted_scheduling_group_name_pattern, sl_it->first));
|
|
}
|
|
service_level_info sl_info = {
|
|
.name = name,
|
|
.sg = sl_it->second.sg,
|
|
};
|
|
_service_levels_db.erase(sl_it);
|
|
co_return co_await seastar::async( [this, name, sl_info] {
|
|
_subscribers.thread_for_each([name, sl_info] (qos_configuration_change_subscriber* subscriber) {
|
|
try {
|
|
subscriber->on_after_service_level_remove(sl_info).get();
|
|
} catch (...) {
|
|
sl_logger.error("notify_service_level_removed: exception occurred in one of the observers callbacks {}", std::current_exception());
|
|
}
|
|
});
|
|
});
|
|
}
|
|
co_return;
|
|
}
|
|
|
|
scheduling_group service_level_controller::get_default_scheduling_group() {
|
|
return _default_service_level.sg;
|
|
}
|
|
|
|
scheduling_group service_level_controller::get_scheduling_group(sstring service_level_name) {
|
|
auto service_level_it = _service_levels_db.find(service_level_name);
|
|
if (service_level_it != _service_levels_db.end()) {
|
|
return service_level_it->second.sg;
|
|
} else {
|
|
return get_default_scheduling_group();
|
|
}
|
|
}
|
|
|
|
scheduling_group service_level_controller::auth_integration::get_user_cached_scheduling_group(const std::optional<auth::authenticated_user>& usr) {
|
|
if (usr && usr->name) {
|
|
auto sl_opt = find_cached_effective_service_level(*usr->name);
|
|
auto& sl_name = (sl_opt && sl_opt->shares_name) ? *sl_opt->shares_name : default_service_level_name;
|
|
return _sl_controller.get_scheduling_group(sl_name);
|
|
} else {
|
|
return _sl_controller.get_default_scheduling_group();
|
|
}
|
|
}
|
|
|
|
scheduling_group service_level_controller::get_cached_user_scheduling_group(const std::optional<auth::authenticated_user>& usr) {
|
|
// The maintenance socket can communicate with Scylla before `auth_integration`
|
|
// is registered, and we need to prepare for it.
|
|
if (!_auth_integration) {
|
|
return get_default_scheduling_group();
|
|
}
|
|
|
|
return _auth_integration->get_user_cached_scheduling_group(usr);
|
|
}
|
|
|
|
std::optional<sstring> service_level_controller::get_active_service_level() {
|
|
unsigned sched_idx = internal::scheduling_group_index(current_scheduling_group());
|
|
if (_sl_lookup[sched_idx].first) {
|
|
return sstring(*_sl_lookup[sched_idx].first);
|
|
} else {
|
|
return std::nullopt;
|
|
}
|
|
}
|
|
|
|
future<> service_level_controller::notify_effective_service_levels_cache_reloaded() {
|
|
co_await _subscribers.for_each([] (qos_configuration_change_subscriber* subscriber) -> future<> {
|
|
return subscriber->on_effective_service_levels_cache_reloaded();
|
|
});
|
|
}
|
|
|
|
future<> service_level_controller::add_distributed_service_level(sstring name, service_level_options slo, bool if_not_exists, service::group0_batch& mc) {
|
|
set_service_level_op_type add_type = if_not_exists ? set_service_level_op_type::add_if_not_exists : set_service_level_op_type::add;
|
|
return set_distributed_service_level(name, slo, add_type, mc);
|
|
}
|
|
|
|
future<> service_level_controller::alter_distributed_service_level(sstring name, service_level_options slo, service::group0_batch& mc) {
|
|
return set_distributed_service_level(name, slo, set_service_level_op_type::alter, mc);
|
|
}
|
|
|
|
future<> service_level_controller::drop_distributed_service_level(sstring name, bool if_exists, service::group0_batch& mc) {
|
|
auto sl_info = co_await _sl_data_accessor->get_service_levels();
|
|
auto it = sl_info.find(name);
|
|
if (it == sl_info.end()) {
|
|
if (if_exists) {
|
|
co_return;
|
|
} else {
|
|
throw nonexistant_service_level_exception(name);
|
|
}
|
|
}
|
|
|
|
auto& role_manager = _auth_service.local().underlying_role_manager();
|
|
auto attributes = co_await role_manager.query_attribute_for_all("service_level");
|
|
|
|
co_await coroutine::parallel_for_each(attributes, [&role_manager, name, &mc] (auto&& attr) {
|
|
if (attr.second == name) {
|
|
return do_with(attr.first, [&role_manager, &mc] (const sstring& role_name) {
|
|
return role_manager.remove_attribute(role_name, "service_level", mc);
|
|
});
|
|
} else {
|
|
return make_ready_future();
|
|
}
|
|
});
|
|
|
|
co_return co_await _sl_data_accessor->drop_service_level(name, mc);
|
|
}
|
|
|
|
future<service_levels_info> service_level_controller::get_distributed_service_levels(qos::query_context ctx) {
|
|
return _sl_data_accessor ? _sl_data_accessor->get_service_levels(ctx) : make_ready_future<service_levels_info>();
|
|
}
|
|
|
|
future<service_levels_info> service_level_controller::get_distributed_service_level(sstring service_level_name) {
|
|
return _sl_data_accessor ? _sl_data_accessor->get_service_level(service_level_name) : make_ready_future<service_levels_info>();
|
|
}
|
|
|
|
future<bool> service_level_controller::validate_before_service_level_add() {
|
|
assert(this_shard_id() == global_controller);
|
|
if (_global_controller_db->deleted_scheduling_groups.size() > 0) {
|
|
return make_ready_future<bool>(true);
|
|
} else if (_global_controller_db->scheduling_groups_exhausted) {
|
|
return make_ready_future<bool>(false);
|
|
} else {
|
|
return create_scheduling_group(seastar::format(temp_scheduling_group_name_pattern, _global_controller_db->unique_group_counter++), "", 1, _global_controller_db->user_ssg).then_wrapped([this] (future<scheduling_group> new_sg_f) {
|
|
if (new_sg_f.failed()) {
|
|
new_sg_f.ignore_ready_future();
|
|
_global_controller_db->scheduling_groups_exhausted = true;
|
|
return make_ready_future<bool>(false);
|
|
}
|
|
_global_controller_db->deleted_scheduling_groups.emplace_back(new_sg_f.get());
|
|
return make_ready_future<bool>(true);
|
|
});
|
|
}
|
|
}
|
|
|
|
future<> service_level_controller::set_distributed_service_level(sstring name, service_level_options slo, set_service_level_op_type op_type, service::group0_batch& mc) {
|
|
auto sl_info = co_await _sl_data_accessor->get_service_levels();
|
|
auto it = sl_info.find(name);
|
|
// test for illegal requests or requests that should terminate without any action
|
|
if (it == sl_info.end()) {
|
|
if (op_type == set_service_level_op_type::alter) {
|
|
throw exceptions::invalid_request_exception(format("The service level '{}' doesn't exist.", name));
|
|
}
|
|
} else {
|
|
if (op_type == set_service_level_op_type::add) {
|
|
throw exceptions::invalid_request_exception(format("The service level '{}' already exists.", name));
|
|
} else if (op_type == set_service_level_op_type::add_if_not_exists) {
|
|
co_return;
|
|
}
|
|
}
|
|
|
|
if (op_type != set_service_level_op_type::alter) {
|
|
bool validation_result = co_await container().invoke_on(global_controller, &service_level_controller::validate_before_service_level_add);
|
|
if (!validation_result&& !utils::get_local_injector().enter("allow_service_level_over_limit")) {
|
|
throw exceptions::invalid_request_exception("Can't create service level - no more scheduling groups exist");
|
|
}
|
|
}
|
|
co_return co_await _sl_data_accessor->set_service_level(name, slo, mc);
|
|
}
|
|
|
|
future<> service_level_controller::do_add_service_level(sstring name, service_level_options slo, bool is_static) {
|
|
auto service_level_it = _service_levels_db.find(name);
|
|
if (is_static) {
|
|
_global_controller_db->static_configurations[name] = slo;
|
|
}
|
|
if (service_level_it != _service_levels_db.end()) {
|
|
if ((is_static && service_level_it->second.is_static) || !is_static) {
|
|
if ((service_level_it->second.is_static) && (!is_static)) {
|
|
service_level_it->second.is_static = false;
|
|
}
|
|
return container().invoke_on_all(&service_level_controller::notify_service_level_updated, name, slo);
|
|
} else {
|
|
// this means we set static layer when the the service level
|
|
// is running of the non static configuration. so we have nothing
|
|
// else to do since we already saved the static configuration.
|
|
return make_ready_future();
|
|
}
|
|
} else {
|
|
return do_with(service_level(slo, is_static, default_scheduling_group()),
|
|
std::move(name), [this] (service_level& sl, sstring& name) {
|
|
return make_ready_future().then([this, &sl, &name] () mutable {
|
|
int32_t share_count = default_shares;
|
|
if (auto* maybe_shares = std::get_if<int32_t>(&sl.slo.shares)) {
|
|
share_count = *maybe_shares;
|
|
}
|
|
|
|
if (!_global_controller_db->deleted_scheduling_groups.empty()) {
|
|
auto&& it = std::find_if(_global_controller_db->deleted_scheduling_groups.begin()
|
|
, _global_controller_db->deleted_scheduling_groups.end()
|
|
, [sg_name_to_find = seastar::format(deleted_scheduling_group_name_pattern, name)] (const scheduling_group& sg) {
|
|
return (sg.name() == sg_name_to_find);
|
|
});
|
|
if (it != _global_controller_db->deleted_scheduling_groups.end()) {
|
|
sl.sg = *it;
|
|
_global_controller_db->deleted_scheduling_groups.erase(it);
|
|
} else {
|
|
sl.sg = _global_controller_db->deleted_scheduling_groups.front();
|
|
_global_controller_db->deleted_scheduling_groups.pop_front();
|
|
}
|
|
return container().invoke_on_all([&sl, share_count] (service_level_controller& service) {
|
|
scheduling_group non_const_sg = sl.sg;
|
|
return non_const_sg.set_shares((float)share_count);
|
|
}).then([&sl, &name] {
|
|
return rename_scheduling_group(sl.sg, seastar::format(scheduling_group_name_pattern, name));
|
|
});
|
|
} else if (_global_controller_db->scheduling_groups_exhausted) {
|
|
return make_exception_future<>(service_level_scheduling_groups_exhausted(name));
|
|
} else {
|
|
return create_scheduling_group(seastar::format(scheduling_group_name_pattern, name), "", share_count, _global_controller_db->user_ssg).then_wrapped([this, name, &sl] (future<scheduling_group> sg_fut) {
|
|
if (sg_fut.failed()) {
|
|
sg_fut.ignore_ready_future();
|
|
_global_controller_db->scheduling_groups_exhausted = true;
|
|
return make_exception_future<>(service_level_scheduling_groups_exhausted(name));
|
|
}
|
|
sl.sg = sg_fut.get();
|
|
return make_ready_future<>();
|
|
});
|
|
}
|
|
}).then([this, &sl, &name] () {
|
|
return container().invoke_on_all(&service_level_controller::notify_service_level_added, name, sl);
|
|
});
|
|
});
|
|
}
|
|
return make_ready_future();
|
|
}
|
|
|
|
future<> service_level_controller::do_remove_service_level(sstring name, bool remove_static) {
|
|
auto service_level_it = _service_levels_db.find(name);
|
|
if (service_level_it != _service_levels_db.end()) {
|
|
auto static_conf_it = _global_controller_db->static_configurations.end();
|
|
bool static_exists = false;
|
|
if (remove_static) {
|
|
_global_controller_db->static_configurations.erase(name);
|
|
} else {
|
|
static_conf_it = _global_controller_db->static_configurations.find(name);
|
|
static_exists = static_conf_it != _global_controller_db->static_configurations.end();
|
|
}
|
|
if (remove_static && service_level_it->second.is_static) {
|
|
return container().invoke_on_all(&service_level_controller::notify_service_level_removed, name);
|
|
} else if (!remove_static && !service_level_it->second.is_static) {
|
|
if (static_exists) {
|
|
service_level_it->second.is_static = true;
|
|
return container().invoke_on_all(&service_level_controller::notify_service_level_updated, name, static_conf_it->second);
|
|
} else {
|
|
return container().invoke_on_all(&service_level_controller::notify_service_level_removed, name);
|
|
}
|
|
}
|
|
}
|
|
return make_ready_future();
|
|
}
|
|
|
|
void service_level_controller::on_leave_cluster(const gms::inet_address& endpoint, const locator::host_id& hid) {
|
|
if (this_shard_id() == global_controller && _token_metadata.get()->get_topology().is_me(hid)) {
|
|
_global_controller_db->dist_data_update_aborter.request_abort();
|
|
_global_controller_db->group0_aborter.request_abort();
|
|
}
|
|
}
|
|
|
|
void service_level_controller::register_subscriber(qos_configuration_change_subscriber* subscriber) {
|
|
_subscribers.add(subscriber);
|
|
}
|
|
|
|
future<> service_level_controller::unregister_subscriber(qos_configuration_change_subscriber* subscriber) {
|
|
return _subscribers.remove(subscriber);
|
|
}
|
|
|
|
enum class describe_cmd: uint8_t {
|
|
CREATE,
|
|
CREATE_IF_NOT_EXISTS,
|
|
ALTER,
|
|
DROP_IF_EXISTS,
|
|
};
|
|
|
|
std::string_view describe_cmd_to_sstring(describe_cmd cmd_enum) {
|
|
switch (cmd_enum) {
|
|
case describe_cmd::CREATE:
|
|
return "CREATE SERVICE LEVEL";
|
|
case describe_cmd::CREATE_IF_NOT_EXISTS:
|
|
return "CREATE SERVICE LEVEL IF NOT EXISTS";
|
|
case describe_cmd::ALTER:
|
|
return "ALTER SERVICE LEVEL";
|
|
case describe_cmd::DROP_IF_EXISTS:
|
|
return "DROP SERVICE LEVEL IF EXISTS";
|
|
};
|
|
}
|
|
|
|
static sstring describe_service_level(std::string_view sl_name, const service_level_options& sl_opts, describe_cmd cmd=describe_cmd::CREATE) {
|
|
using slo = service_level_options;
|
|
|
|
utils::small_vector<sstring, 3> opts{};
|
|
|
|
const sstring sl_name_formatted = cql3::util::maybe_quote(sl_name);
|
|
|
|
if (auto maybe_timeout = std::get_if<lowres_clock::duration>(&sl_opts.timeout)) {
|
|
// According to the documentation, `TIMEOUT` has to be expressed in milliseconds
|
|
// or seconds. It is therefore safe to use milliseconds here.
|
|
const auto timeout = std::chrono::duration_cast<std::chrono::milliseconds>(*maybe_timeout);
|
|
opts.push_back(seastar::format("TIMEOUT = {}", timeout));
|
|
}
|
|
|
|
switch (sl_opts.workload) {
|
|
case slo::workload_type::batch:
|
|
opts.push_back("WORKLOAD_TYPE = 'batch'");
|
|
break;
|
|
case slo::workload_type::interactive:
|
|
opts.push_back("WORKLOAD_TYPE = 'interactive'");
|
|
break;
|
|
case slo::workload_type::unspecified:
|
|
break;
|
|
case slo::workload_type::delete_marker:
|
|
// `slo::workload_typ::delete_marker` is only set temporarily. When a service level
|
|
// is actually created, it never has this workload type set anymore.
|
|
on_internal_error(sl_logger, "Unexpected workload type");
|
|
}
|
|
|
|
if (auto* maybe_shares = std::get_if<int32_t>(&sl_opts.shares)) {
|
|
opts.push_back(seastar::format("SHARES = {}", *maybe_shares));
|
|
}
|
|
|
|
if (opts.size() == 0) {
|
|
return seastar::format("{} {};", describe_cmd_to_sstring(cmd), sl_name_formatted);
|
|
}
|
|
|
|
return seastar::format("{} {} WITH {};", describe_cmd_to_sstring(cmd), sl_name_formatted, fmt::join(opts, " AND "));
|
|
}
|
|
|
|
utils::small_vector<cql3::description, 2> describe_driver_service_level(const std::optional<service_level_options>& driver_service_level_slo) {
|
|
utils::small_vector<cql3::description, 2> result;
|
|
const auto service_level_type = "service_level";
|
|
if (driver_service_level_slo.has_value()) {
|
|
// We need to use CREATE IF EXISTS because `driver` service level can be already created automatically
|
|
// We also need to ALTER because if driver exists, it can have different shares number
|
|
const sstring create_statement = describe_service_level(service_level_controller::driver_service_level_name, driver_service_level_slo.value(), describe_cmd::CREATE_IF_NOT_EXISTS);
|
|
const sstring alter_statement = describe_service_level(service_level_controller::driver_service_level_name, driver_service_level_slo.value(), describe_cmd::ALTER);
|
|
|
|
result.push_back(cql3::description {
|
|
.keyspace = std::nullopt,
|
|
.type = service_level_type,
|
|
.name = service_level_controller::driver_service_level_name,
|
|
.create_statement = managed_string(create_statement)
|
|
});
|
|
result.push_back(cql3::description {
|
|
.keyspace = std::nullopt,
|
|
.type = service_level_type,
|
|
.name = service_level_controller::driver_service_level_name,
|
|
.create_statement = managed_string(alter_statement)
|
|
});
|
|
} else {
|
|
const sstring drop_statement = describe_service_level(service_level_controller::driver_service_level_name, service_level_options{}, describe_cmd::DROP_IF_EXISTS);
|
|
result.push_back(cql3::description {
|
|
.keyspace = std::nullopt,
|
|
.type = service_level_type,
|
|
.name = service_level_controller::driver_service_level_name,
|
|
.create_statement = managed_string(drop_statement)
|
|
});
|
|
}
|
|
return result;
|
|
}
|
|
|
|
future<std::vector<cql3::description>> service_level_controller::describe_created_service_levels() const {
|
|
|
|
std::vector<cql3::description> result{};
|
|
|
|
// If we use Raft, we can rely on the cache and avoid a query.
|
|
// The cache gets updated when applying Raft log, so it's always up-to-date
|
|
// with the table.
|
|
//
|
|
// If Raft is not used, that means we're performing a rolling upgrade right now
|
|
// or the migration to topology on Raft hasn't started yet. It's highly unlikely
|
|
// anyone will try to make a backup in that situation, so we don't have a branch here.
|
|
//
|
|
// If Raft is not used yet, updating the cache will happen every 10 seconds. We deem it
|
|
// good enough if someone does attempt to make a backup in that state.
|
|
|
|
std::optional<service_level_options> driver_service_level_slo;
|
|
for (const auto& [sl_name, sl] : _service_levels_db) {
|
|
if (sl.is_static) {
|
|
continue;
|
|
}
|
|
if (sl_name == driver_service_level_name) {
|
|
driver_service_level_slo = sl.slo;
|
|
continue;
|
|
}
|
|
|
|
sstring create_statement = describe_service_level(sl_name, sl.slo);
|
|
|
|
result.push_back(cql3::description {
|
|
// Service levels do not belong to any keyspace.
|
|
.keyspace = std::nullopt,
|
|
.type = "service_level",
|
|
.name = sl_name,
|
|
.create_statement = managed_string(create_statement)
|
|
});
|
|
|
|
co_await coroutine::maybe_yield();
|
|
}
|
|
|
|
std::ranges::sort(result, std::less<>{}, std::mem_fn(&cql3::description::name));
|
|
auto driver_sl_description = describe_driver_service_level(driver_service_level_slo);
|
|
|
|
std::vector<cql3::description> combined;
|
|
combined.reserve(result.size() + driver_sl_description.size());
|
|
std::move(driver_sl_description.begin(), driver_sl_description.end(), std::back_inserter(combined));
|
|
std::move(result.begin(), result.end(), std::back_inserter(combined));
|
|
co_return combined;
|
|
}
|
|
|
|
future<std::vector<cql3::description>> service_level_controller::auth_integration::describe_attached_service_levels() {
|
|
const auto _ = _stop_gate.hold();
|
|
|
|
const auto attached_service_levels = co_await _auth_service.underlying_role_manager().query_attribute_for_all("service_level");
|
|
|
|
std::vector<cql3::description> result{};
|
|
result.reserve(attached_service_levels.size());
|
|
|
|
for (const auto& [role, service_level] : attached_service_levels) {
|
|
const auto formatted_role = cql3::util::maybe_quote(role);
|
|
const auto formatted_sl = cql3::util::maybe_quote(service_level);
|
|
|
|
sstring create_statement = seastar::format("ATTACH SERVICE LEVEL {} TO {};", formatted_sl, formatted_role);
|
|
|
|
result.push_back(cql3::description {
|
|
// Attaching a service level doesn't belong to any keyspace.
|
|
.keyspace = std::nullopt,
|
|
.type = "service_level_attachment",
|
|
.name = service_level,
|
|
.create_statement = managed_string(create_statement)
|
|
});
|
|
|
|
co_await coroutine::maybe_yield();
|
|
}
|
|
|
|
std::ranges::sort(result, std::less<>{}, [] (const cql3::description& desc) {
|
|
return std::make_tuple(std::ref(desc.name), std::ref(*desc.create_statement));
|
|
});
|
|
|
|
co_return result;
|
|
}
|
|
|
|
future<std::vector<cql3::description>> service_level_controller::describe_service_levels() {
|
|
if (_auth_integration == nullptr) {
|
|
throw std::runtime_error("Describing service levels requires that `auth_integration` has been registered, "
|
|
"but it has not. One of the potential reasons is using the maintenance socket.");
|
|
}
|
|
|
|
std::vector<cql3::description> created_service_levels_descs = co_await describe_created_service_levels();
|
|
std::vector<cql3::description> attached_service_levels_descs = co_await _auth_integration->describe_attached_service_levels();
|
|
|
|
created_service_levels_descs.insert(created_service_levels_descs.end(),
|
|
std::make_move_iterator(attached_service_levels_descs.begin()), std::make_move_iterator(attached_service_levels_descs.end()));
|
|
|
|
co_return created_service_levels_descs;
|
|
}
|
|
|
|
void service_level_controller::register_auth_integration(auth::service& auth_service) {
|
|
SCYLLA_ASSERT(_auth_integration == nullptr);
|
|
_auth_integration = std::make_unique<auth_integration>(*this, auth_service);
|
|
}
|
|
|
|
future<> service_level_controller::unregister_auth_integration() {
|
|
SCYLLA_ASSERT(_auth_integration != nullptr);
|
|
// First, prevent new tasks coming to `auth_integration`.
|
|
auto tmp = std::exchange(_auth_integration, nullptr);
|
|
// Now we can stop it.
|
|
co_await tmp->stop();
|
|
}
|
|
|
|
} // namespace qos
|