/* * Copyright (C) 2016-present ScyllaDB * * Modified by ScyllaDB */ /* * SPDX-License-Identifier: (LicenseRef-ScyllaDB-Source-Available-1.0 and Apache-2.0) */ #include "client_state.hh" #include "auth/authenticator.hh" #include "auth/common.hh" #include "auth/resource.hh" #include "exceptions/exceptions.hh" #include "db/system_keyspace.hh" #include "db/schema_tables.hh" #include "tracing/trace_keyspace_helper.hh" #include "db/system_distributed_keyspace.hh" #include "replica/database.hh" #include "utils/overloaded_functor.hh" #include #include "service/paxos/paxos_state.hh" thread_local api::timestamp_type service::client_state::_last_timestamp_micros = 0; void service::client_state::set_login(auth::authenticated_user user) { _user = std::move(user); } future<> service::client_state::check_user_can_login() { if (auth::is_anonymous(*_user)) { co_return; } auto& role_manager = _auth_service->underlying_role_manager(); const bool exists = co_await role_manager.exists(*_user->name); if (!exists) { throw exceptions::authentication_exception( format("User {} doesn't exist - create it with CREATE USER query first", *_user->name)); } bool can_login = co_await role_manager.can_login(*_user->name); if (!can_login) { throw exceptions::authentication_exception(format("{} is not permitted to log in", *_user->name)); } } void service::client_state::validate_login() const { if (!_user) { throw exceptions::unauthorized_exception("You have not logged in"); } } void service::client_state::ensure_not_anonymous() const { validate_login(); if (auth::is_anonymous(*_user)) { throw exceptions::unauthorized_exception("You have to be logged in and not anonymous to perform this request"); } } future<> service::client_state::has_all_keyspaces_access( auth::permission p) const { if (_is_internal) { co_return; } validate_login(); auth::resource r{auth::resource_kind::data}; co_return co_await ensure_has_permission({p, r}); } future<> service::client_state::has_keyspace_access(const sstring& ks, auth::permission p) const { auth::resource r = auth::make_data_resource(ks); co_return co_await has_access(ks, {p, r}); } future<> service::client_state::has_functions_access(auth::permission p) const { auth::resource r = auth::make_functions_resource(); co_return co_await ensure_has_permission({p, r}); } future<> service::client_state::has_functions_access(const sstring& ks, auth::permission p) const { auth::resource r = auth::make_functions_resource(ks); co_return co_await has_access(ks, {p, r}); } future<> service::client_state::has_function_access(const sstring& ks, const sstring& function_signature, auth::permission p) const { auth::resource r = auth::make_functions_resource(ks, function_signature); co_return co_await has_access(ks, {p, r}); } future<> service::client_state::has_column_family_access(const sstring& ks, const sstring& cf, auth::permission p, auth::command_desc::type t, std::optional is_vector_indexed) const { auto r = auth::make_data_resource(ks, cf); co_return co_await has_access(ks, {p, r, t}, is_vector_indexed); } future<> service::client_state::has_schema_access(const schema& s, auth::permission p) const { auth::resource r = auth::make_data_resource(s.ks_name(), s.cf_name()); co_return co_await has_access(s.ks_name(), {p, r}); } future<> service::client_state::has_schema_access(const sstring& ks_name, const sstring& cf_name, auth::permission p) const { auth::resource r = auth::make_data_resource(ks_name, cf_name); co_return co_await has_access(ks_name, {p, r}); } future<> service::client_state::check_internal_table_permissions(std::string_view ks, std::string_view table_name, const auth::command_desc& cmd) const { // 1. CDC and $paxos tables are managed internally by Scylla. Users are prohibited // from running ALTER or DROP commands on them. // 2. Non-superusers are not allowed to access $paxos tables, even if explicit // permissions have been granted. // Note: This function is on a hot path; coroutines are avoided to reduce allocations. static constexpr auto forbidden_permissions = auth::permission_set::of< auth::permission::ALTER, auth::permission::DROP>(); if (forbidden_permissions.contains(cmd.permission)) { if ((ks == db::system_distributed_keyspace::NAME || ks == db::system_distributed_keyspace::NAME_EVERYWHERE) && (table_name == db::system_distributed_keyspace::CDC_DESC_V2 || table_name == db::system_distributed_keyspace::CDC_TOPOLOGY_DESCRIPTION || table_name == db::system_distributed_keyspace::CDC_TIMESTAMPS || table_name == db::system_distributed_keyspace::CDC_GENERATIONS_V2)) { return make_exception_future(exceptions::unauthorized_exception( format("Cannot {} {}", auth::permissions::to_string(cmd.permission), cmd.resource))); } } if (service::paxos::paxos_store::try_get_base_table(table_name)) { if (forbidden_permissions.contains(cmd.permission)) { return make_exception_future(exceptions::unauthorized_exception( format("Cannot {} {}", auth::permissions::to_string(cmd.permission), cmd.resource))); } return _auth_service->underlying_role_manager().is_superuser(*_user->name) .then([&cmd](bool is_superuser) { return is_superuser ? make_ready_future<>() : make_exception_future(exceptions::unauthorized_exception( format("Only superusers are allowed to {} {}", auth::permissions::to_string(cmd.permission), cmd.resource))); }); } return make_ready_future<>(); } future<> service::client_state::has_access(const sstring& ks, auth::command_desc cmd, std::optional is_vector_indexed) const { if (ks.empty()) { throw exceptions::invalid_request_exception("You have not set a keyspace for this session"); } if (_is_internal) { co_return; } validate_login(); static const auto alteration_permissions = auth::permission_set::of< auth::permission::CREATE, auth::permission::ALTER, auth::permission::DROP>(); // we only care about schema modification. if (alteration_permissions.contains(cmd.permission)) { // prevent system keyspace modification auto name = ks; std::transform(name.begin(), name.end(), name.begin(), ::tolower); if (is_system_keyspace(name) && cmd.type_ != auth::command_desc::type::ALTER_SYSTEM_WITH_ALLOWED_OPTS) { throw exceptions::unauthorized_exception(ks + " keyspace is not user-modifiable."); } // // we want to disallow dropping any contents of TRACING_KS and disallow dropping the `auth::meta::legacy::AUTH_KS` // keyspace. // const bool dropping_anything_in_tracing = (name == tracing::trace_keyspace_helper::KEYSPACE_NAME) && (cmd.permission == auth::permission::DROP); const bool dropping_auth_keyspace = (cmd.resource == auth::make_data_resource(auth::meta::legacy::AUTH_KS)) && (cmd.permission == auth::permission::DROP); if (dropping_anything_in_tracing || dropping_auth_keyspace) { throw exceptions::unauthorized_exception( format("Cannot {} {}", auth::permissions::to_string(cmd.permission), cmd.resource)); } } static thread_local std::unordered_set readable_system_resources = [] { std::unordered_set tmp; for (auto cf : { db::system_keyspace::LOCAL, db::system_keyspace::PEERS }) { tmp.insert(auth::make_data_resource(db::system_keyspace::NAME, cf)); } for (const auto& cf : db::schema_tables::all_table_infos(db::schema_features::full())) { tmp.insert(auth::make_data_resource(db::schema_tables::NAME, cf.name)); } return tmp; }(); if (cmd.permission == auth::permission::SELECT && readable_system_resources.contains(cmd.resource)) { co_return; } if (alteration_permissions.contains(cmd.permission)) { if (auth::is_protected(*_auth_service, cmd)) { throw exceptions::unauthorized_exception(format("{} is protected", cmd.resource)); } } if (cmd.resource.kind() == auth::resource_kind::data) { const auto resource_view = auth::data_resource_view(cmd.resource); if (resource_view.table()) { co_await check_internal_table_permissions(ks, *resource_view.table(), cmd); } } if (cmd.resource.kind() == auth::resource_kind::data && !(cmd.permission == auth::permission::SELECT || cmd.permission == auth::permission::DESCRIBE) && is_system_keyspace(ks) && _user && !auth::is_anonymous(*_user) && !co_await _auth_service->underlying_role_manager().is_superuser(*_user->name)) [[unlikely]] { throw exceptions::unauthorized_exception( ks + " can be granted only SELECT or DESCRIBE permissions to a non-superuser."); } if (cmd.resource.kind() == auth::resource_kind::data && cmd.permission == auth::permission::SELECT && is_vector_indexed.has_value() && is_vector_indexed.value()) { co_return co_await ensure_has_permission({auth::permission_set::of(), cmd.resource}); } co_return co_await ensure_has_permission(cmd); } static bool intersects_permissions(const auth::permission_set& permissions, const auth::command_desc_with_permission_set& cmd) { return permissions.intersects(cmd.permission); } static bool intersects_permissions(const auth::permission_set& permissions, const auth::command_desc& cmd) { return permissions.contains(cmd.permission); } sstring service::client_state::generate_authorization_error_msg(const auth::command_desc& cmd) const { return format("User {} has no {} permission on {} or any of its parents", *_user, auth::permissions::to_string(cmd.permission), cmd.resource); } sstring service::client_state::generate_authorization_error_msg(const auth::command_desc_with_permission_set& cmd) const { sstring perm_names = fmt::format("{}", fmt::join(auth::permissions::to_strings(cmd.permission), ", ")); return format("User {} has none of the permissions ({}) on {} or any of its parents", *_user, perm_names, cmd.resource); } template future service::client_state::check_has_permission(Cmd cmd) const { if (_is_internal) { co_return true; } std::optional parent_r = cmd.resource.parent(); auth::permission_set set = co_await auth::get_permissions(*_auth_service, *_user, cmd.resource); if (intersects_permissions(set, cmd)) { co_return true; } if (parent_r) { co_return co_await check_has_permission({cmd.permission, *parent_r}); } co_return false; } template future service::client_state::check_has_permission(auth::command_desc) const; template future service::client_state::check_has_permission(auth::command_desc_with_permission_set) const; template future<> service::client_state::ensure_has_permission(Cmd cmd) const { return check_has_permission(cmd).then([this, cmd](bool ok) { if (!ok) { return make_exception_future<>(exceptions::unauthorized_exception( generate_authorization_error_msg(cmd))); } return make_ready_future<>(); }); } template future<> service::client_state::ensure_has_permission(auth::command_desc) const; template future<> service::client_state::ensure_has_permission(auth::command_desc_with_permission_set) const; void service::client_state::set_keyspace(replica::database& db, std::string_view keyspace) { // Skip keyspace validation for non-authenticated users. Apparently, some client libraries // call set_keyspace() before calling login(), and we have to handle that. if (_user && !db.has_keyspace(keyspace)) { throw exceptions::invalid_request_exception(seastar::format("Keyspace '{}' does not exist", keyspace)); } _keyspace = sstring(keyspace); } future<> service::client_state::ensure_exists(const auth::resource& r) const { return _auth_service->exists(r).then([&r](bool exists) { if (!exists) { return make_exception_future<>(exceptions::invalid_request_exception(format("{} doesn't exist.", r))); } return make_ready_future<>(); }); } future<> service::client_state::maybe_update_per_service_level_params() { if (_sl_controller && _user && _user->name) { auto slo_opt = co_await _sl_controller->find_effective_service_level(_user->name.value()); if (!slo_opt) { co_return; } update_per_service_level_params(*slo_opt); } } void service::client_state::update_per_service_level_params(qos::service_level_options& slo) { auto slo_timeout_or = [&] (const lowres_clock::duration& default_timeout) { return std::visit(overloaded_functor{ [&] (const qos::service_level_options::unset_marker&) -> lowres_clock::duration { return default_timeout; }, [&] (const qos::service_level_options::delete_marker&) -> lowres_clock::duration { return default_timeout; }, [&] (const lowres_clock::duration& d) -> lowres_clock::duration { return d; }, }, slo.timeout); }; _timeout_config.read_timeout = slo_timeout_or(_default_timeout_config.read_timeout); _timeout_config.write_timeout = slo_timeout_or(_default_timeout_config.write_timeout); _timeout_config.range_read_timeout = slo_timeout_or(_default_timeout_config.range_read_timeout); _timeout_config.counter_write_timeout = slo_timeout_or(_default_timeout_config.counter_write_timeout); _timeout_config.truncate_timeout = slo_timeout_or(_default_timeout_config.truncate_timeout); _timeout_config.cas_timeout = slo_timeout_or(_default_timeout_config.cas_timeout); _timeout_config.other_timeout = slo_timeout_or(_default_timeout_config.other_timeout); _workload_type = slo.workload; }