Files
scylladb/service/client_state.cc
Vlad Zolotarov 85adf6bdb1 system.clients: add a client_options column
This new column is going to contain all OPTIONS sent in the
STARTUP frame of the corresponding CQL session.

The new column has a `frozen<map<text, text>>` type, and
we are also optimizing the amount of required memory for storing
corresponding keys and values by caching them on each shard level.

Signed-off-by: Vlad Zolotarov <vladz@scylladb.com>
2025-12-20 12:26:15 -05:00

360 lines
15 KiB
C++

/*
* Copyright (C) 2016-present ScyllaDB
*
* Modified by ScyllaDB
*/
/*
* SPDX-License-Identifier: (LicenseRef-ScyllaDB-Source-Available-1.0 and Apache-2.0)
*/
#include "client_state.hh"
#include "auth/authenticator.hh"
#include "auth/common.hh"
#include "auth/resource.hh"
#include "exceptions/exceptions.hh"
#include "db/system_keyspace.hh"
#include "db/schema_tables.hh"
#include "tracing/trace_keyspace_helper.hh"
#include "db/system_distributed_keyspace.hh"
#include "replica/database.hh"
#include "utils/overloaded_functor.hh"
#include <seastar/core/coroutine.hh>
#include "service/paxos/paxos_state.hh"
thread_local api::timestamp_type service::client_state::_last_timestamp_micros = 0;
void service::client_state::set_login(auth::authenticated_user user) {
_user = std::move(user);
}
future<> service::client_state::check_user_can_login() {
if (auth::is_anonymous(*_user)) {
co_return;
}
auto& role_manager = _auth_service->underlying_role_manager();
const bool exists = co_await role_manager.exists(*_user->name);
if (!exists) {
throw exceptions::authentication_exception(
format("User {} doesn't exist - create it with CREATE USER query first", *_user->name));
}
bool can_login = co_await role_manager.can_login(*_user->name);
if (!can_login) {
throw exceptions::authentication_exception(format("{} is not permitted to log in", *_user->name));
}
}
void service::client_state::validate_login() const {
if (!_user) {
throw exceptions::unauthorized_exception("You have not logged in");
}
}
void service::client_state::ensure_not_anonymous() const {
validate_login();
if (auth::is_anonymous(*_user)) {
throw exceptions::unauthorized_exception("You have to be logged in and not anonymous to perform this request");
}
}
future<> service::client_state::has_all_keyspaces_access(
auth::permission p) const {
if (_is_internal) {
co_return;
}
validate_login();
auth::resource r{auth::resource_kind::data};
co_return co_await ensure_has_permission({p, r});
}
future<> service::client_state::has_keyspace_access(const sstring& ks, auth::permission p) const {
auth::resource r = auth::make_data_resource(ks);
co_return co_await has_access(ks, {p, r});
}
future<> service::client_state::has_functions_access(auth::permission p) const {
auth::resource r = auth::make_functions_resource();
co_return co_await ensure_has_permission({p, r});
}
future<> service::client_state::has_functions_access(const sstring& ks, auth::permission p) const {
auth::resource r = auth::make_functions_resource(ks);
co_return co_await has_access(ks, {p, r});
}
future<> service::client_state::has_function_access(const sstring& ks, const sstring& function_signature, auth::permission p) const {
auth::resource r = auth::make_functions_resource(ks, function_signature);
co_return co_await has_access(ks, {p, r});
}
future<> service::client_state::has_column_family_access(const sstring& ks,
const sstring& cf, auth::permission p, auth::command_desc::type t, std::optional<bool> is_vector_indexed) const {
auto r = auth::make_data_resource(ks, cf);
co_return co_await has_access(ks, {p, r, t}, is_vector_indexed);
}
future<> service::client_state::has_schema_access(const schema& s, auth::permission p) const {
auth::resource r = auth::make_data_resource(s.ks_name(), s.cf_name());
co_return co_await has_access(s.ks_name(), {p, r});
}
future<> service::client_state::has_schema_access(const sstring& ks_name, const sstring& cf_name, auth::permission p) const {
auth::resource r = auth::make_data_resource(ks_name, cf_name);
co_return co_await has_access(ks_name, {p, r});
}
future<> service::client_state::check_internal_table_permissions(std::string_view ks, std::string_view table_name, const auth::command_desc& cmd) const {
// 1. CDC and $paxos tables are managed internally by Scylla. Users are prohibited
// from running ALTER or DROP commands on them.
// 2. Non-superusers are not allowed to access $paxos tables, even if explicit
// permissions have been granted.
// Note: This function is on a hot path; coroutines are avoided to reduce allocations.
static constexpr auto forbidden_permissions = auth::permission_set::of<
auth::permission::ALTER, auth::permission::DROP>();
if (forbidden_permissions.contains(cmd.permission)) {
if ((ks == db::system_distributed_keyspace::NAME || ks == db::system_distributed_keyspace::NAME_EVERYWHERE)
&& (table_name == db::system_distributed_keyspace::CDC_DESC_V2
|| table_name == db::system_distributed_keyspace::CDC_TOPOLOGY_DESCRIPTION
|| table_name == db::system_distributed_keyspace::CDC_TIMESTAMPS
|| table_name == db::system_distributed_keyspace::CDC_GENERATIONS_V2)) {
return make_exception_future(exceptions::unauthorized_exception(
format("Cannot {} {}", auth::permissions::to_string(cmd.permission), cmd.resource)));
}
}
if (service::paxos::paxos_store::try_get_base_table(table_name)) {
if (forbidden_permissions.contains(cmd.permission)) {
return make_exception_future(exceptions::unauthorized_exception(
format("Cannot {} {}", auth::permissions::to_string(cmd.permission), cmd.resource)));
}
return _auth_service->underlying_role_manager().is_superuser(*_user->name)
.then([&cmd](bool is_superuser) {
return is_superuser
? make_ready_future<>()
: make_exception_future(exceptions::unauthorized_exception(
format("Only superusers are allowed to {} {}",
auth::permissions::to_string(cmd.permission), cmd.resource)));
});
}
return make_ready_future<>();
}
future<> service::client_state::has_access(const sstring& ks, auth::command_desc cmd, std::optional<bool> is_vector_indexed) const {
if (ks.empty()) {
throw exceptions::invalid_request_exception("You have not set a keyspace for this session");
}
if (_is_internal) {
co_return;
}
validate_login();
static const auto alteration_permissions = auth::permission_set::of<
auth::permission::CREATE, auth::permission::ALTER, auth::permission::DROP>();
// we only care about schema modification.
if (alteration_permissions.contains(cmd.permission)) {
// prevent system keyspace modification
auto name = ks;
std::transform(name.begin(), name.end(), name.begin(), ::tolower);
if (is_system_keyspace(name) && cmd.type_ != auth::command_desc::type::ALTER_SYSTEM_WITH_ALLOWED_OPTS) {
throw exceptions::unauthorized_exception(ks + " keyspace is not user-modifiable.");
}
//
// we want to disallow dropping any contents of TRACING_KS and disallow dropping the `auth::meta::legacy::AUTH_KS`
// keyspace.
//
const bool dropping_anything_in_tracing = (name == tracing::trace_keyspace_helper::KEYSPACE_NAME)
&& (cmd.permission == auth::permission::DROP);
const bool dropping_auth_keyspace = (cmd.resource == auth::make_data_resource(auth::meta::legacy::AUTH_KS))
&& (cmd.permission == auth::permission::DROP);
if (dropping_anything_in_tracing || dropping_auth_keyspace) {
throw exceptions::unauthorized_exception(
format("Cannot {} {}", auth::permissions::to_string(cmd.permission), cmd.resource));
}
}
static thread_local std::unordered_set<auth::resource> readable_system_resources = [] {
std::unordered_set<auth::resource> tmp;
for (auto cf : { db::system_keyspace::LOCAL, db::system_keyspace::PEERS }) {
tmp.insert(auth::make_data_resource(db::system_keyspace::NAME, cf));
}
for (const auto& cf : db::schema_tables::all_table_infos(db::schema_features::full())) {
tmp.insert(auth::make_data_resource(db::schema_tables::NAME, cf.name));
}
return tmp;
}();
if (cmd.permission == auth::permission::SELECT && readable_system_resources.contains(cmd.resource)) {
co_return;
}
if (alteration_permissions.contains(cmd.permission)) {
if (auth::is_protected(*_auth_service, cmd)) {
throw exceptions::unauthorized_exception(format("{} is protected", cmd.resource));
}
}
if (cmd.resource.kind() == auth::resource_kind::data) {
const auto resource_view = auth::data_resource_view(cmd.resource);
if (resource_view.table()) {
co_await check_internal_table_permissions(ks, *resource_view.table(), cmd);
}
}
if (cmd.resource.kind() == auth::resource_kind::data
&& !(cmd.permission == auth::permission::SELECT || cmd.permission == auth::permission::DESCRIBE)
&& is_system_keyspace(ks)
&& _user
&& !auth::is_anonymous(*_user)
&& !co_await _auth_service->underlying_role_manager().is_superuser(*_user->name)) [[unlikely]] {
throw exceptions::unauthorized_exception(
ks + " can be granted only SELECT or DESCRIBE permissions to a non-superuser.");
}
if (cmd.resource.kind() == auth::resource_kind::data && cmd.permission == auth::permission::SELECT && is_vector_indexed.has_value() && is_vector_indexed.value()) {
co_return co_await ensure_has_permission<auth::command_desc_with_permission_set>({auth::permission_set::of<auth::permission::SELECT, auth::permission::VECTOR_SEARCH_INDEXING>(), cmd.resource});
}
co_return co_await ensure_has_permission(cmd);
}
static bool intersects_permissions(const auth::permission_set& permissions, const auth::command_desc_with_permission_set& cmd) {
return permissions.intersects(cmd.permission);
}
static bool intersects_permissions(const auth::permission_set& permissions, const auth::command_desc& cmd) {
return permissions.contains(cmd.permission);
}
sstring service::client_state::generate_authorization_error_msg(const auth::command_desc& cmd) const {
return format("User {} has no {} permission on {} or any of its parents",
*_user,
auth::permissions::to_string(cmd.permission),
cmd.resource);
}
sstring service::client_state::generate_authorization_error_msg(const auth::command_desc_with_permission_set& cmd) const {
sstring perm_names = fmt::format("{}", fmt::join(auth::permissions::to_strings(cmd.permission), ", "));
return format("User {} has none of the permissions ({}) on {} or any of its parents",
*_user,
perm_names,
cmd.resource);
}
template <typename Cmd>
future<bool> service::client_state::check_has_permission(Cmd cmd) const {
if (_is_internal) {
co_return true;
}
std::optional<auth::resource> parent_r = cmd.resource.parent();
auth::permission_set set = co_await auth::get_permissions(*_auth_service, *_user, cmd.resource);
if (intersects_permissions(set, cmd)) {
co_return true;
}
if (parent_r) {
co_return co_await check_has_permission<Cmd>({cmd.permission, *parent_r});
}
co_return false;
}
template future<bool> service::client_state::check_has_permission(auth::command_desc) const;
template future<bool> service::client_state::check_has_permission<auth::command_desc_with_permission_set>(auth::command_desc_with_permission_set) const;
template <typename Cmd>
future<> service::client_state::ensure_has_permission(Cmd cmd) const {
return check_has_permission(cmd).then([this, cmd](bool ok) {
if (!ok) {
return make_exception_future<>(exceptions::unauthorized_exception(
generate_authorization_error_msg(cmd)));
}
return make_ready_future<>();
});
}
template future<> service::client_state::ensure_has_permission(auth::command_desc) const;
template future<> service::client_state::ensure_has_permission<auth::command_desc_with_permission_set>(auth::command_desc_with_permission_set) const;
void service::client_state::set_keyspace(replica::database& db, std::string_view keyspace) {
// Skip keyspace validation for non-authenticated users. Apparently, some client libraries
// call set_keyspace() before calling login(), and we have to handle that.
if (_user && !db.has_keyspace(keyspace)) {
throw exceptions::invalid_request_exception(seastar::format("Keyspace '{}' does not exist", keyspace));
}
_keyspace = sstring(keyspace);
}
future<> service::client_state::ensure_exists(const auth::resource& r) const {
return _auth_service->exists(r).then([&r](bool exists) {
if (!exists) {
return make_exception_future<>(exceptions::invalid_request_exception(format("{} doesn't exist.", r)));
}
return make_ready_future<>();
});
}
future<> service::client_state::maybe_update_per_service_level_params() {
if (_sl_controller && _user && _user->name) {
auto slo_opt = co_await _sl_controller->find_effective_service_level(_user->name.value());
if (!slo_opt) {
co_return;
}
update_per_service_level_params(*slo_opt);
}
}
void service::client_state::update_per_service_level_params(qos::service_level_options& slo) {
auto slo_timeout_or = [&] (const lowres_clock::duration& default_timeout) {
return std::visit(overloaded_functor{
[&] (const qos::service_level_options::unset_marker&) -> lowres_clock::duration {
return default_timeout;
},
[&] (const qos::service_level_options::delete_marker&) -> lowres_clock::duration {
return default_timeout;
},
[&] (const lowres_clock::duration& d) -> lowres_clock::duration {
return d;
},
}, slo.timeout);
};
_timeout_config.read_timeout = slo_timeout_or(_default_timeout_config.read_timeout);
_timeout_config.write_timeout = slo_timeout_or(_default_timeout_config.write_timeout);
_timeout_config.range_read_timeout = slo_timeout_or(_default_timeout_config.range_read_timeout);
_timeout_config.counter_write_timeout = slo_timeout_or(_default_timeout_config.counter_write_timeout);
_timeout_config.truncate_timeout = slo_timeout_or(_default_timeout_config.truncate_timeout);
_timeout_config.cas_timeout = slo_timeout_or(_default_timeout_config.cas_timeout);
_timeout_config.other_timeout = slo_timeout_or(_default_timeout_config.other_timeout);
_workload_type = slo.workload;
}
future<> service::client_state::set_client_options(
client_options_cache_type& keys_and_values_cache,
const std::unordered_map<sstring, sstring>& client_options) {
for (const auto& [key, value] : client_options) {
auto cached_key = co_await keys_and_values_cache.get_or_load(key, [] (const client_options_cache_key_type&) {
return make_ready_future<options_cache_value_type>(options_cache_value_type{});
});
auto cached_value = co_await keys_and_values_cache.get_or_load(value, [] (const client_options_cache_key_type&) {
return make_ready_future<options_cache_value_type>(options_cache_value_type{});
});
_client_options.emplace_back(std::move(cached_key), std::move(cached_value));
}
}