In this series we add support for forwarding strongly consistent CQL requests to suitable replicas, so that clients can issue reads/writes to any node and have the request executed on an appropriate tablet replica (and, for writes, on the Raft leader). We return the same CQL response as what the user would get while sending the request to the correct replica and we perform the same logging/stats updates on the request coordinator as if the coordinator was the appropriate replica. The core mechanism of forwarding a strongly consistent request is sending an RPC containing the user's cql request frame to the appropriate replica and returning back a ready, serialized `cql_transport::response`. We do this in the CQL server - it is most prepared for handling these types and forwarding a request containing a CQL frame allows us to reuse near-top-level methods for CQL request handling in the new RPC handler (such as the general `process`) For sending the RPC, the CQL server needs to obtain the information about who should it forward the request to. This requires knowledge about the tablet raft group members and leader. We obtain this information during the execution of a `cql3/strong_consistency` statement, and we return this information back to the CQL server using the generalized `bounce_to_shard` `response_message`, where we now store the information about either a shard, or a specific replica to which we should forward to. Similarly to `bounce_to_shard`, we need to handle this `result_message` in a loop - a replica may move during statement execution, or the Raft leader can change. We also use it for forwarding strongly consistent writes when we're not a member of the affected tablet raft group - in that case we need to forward the statement twice - once to any replica of the affected tablet, then that replica can find the leader and return this information to the coordinator, which allows the second request to be directed to the leader. This feature also allows passing through exception messages which happened on the target replica while executing the statement. For that, many methods of the `cql_transport::cql_server::connection` for creating error responses needed to be moved to `cql_transport::cql_server`. And for final exception handling on the coordinator, we added additional error info to the RPC response, so that the handling can be performed without having the `result_message::exception` or `exception_ptr` itself. Fixes [SCYLLADB-71](https://scylladb.atlassian.net/browse/SCYLLADB-71) [SCYLLADB-71]: https://scylladb.atlassian.net/browse/SCYLLADB-71?atlOrigin=eyJpIjoiNWRkNTljNzYxNjVmNDY3MDlhMDU5Y2ZhYzA5YTRkZjUiLCJwIjoiZ2l0aHViLWNvbS1KU1cifQ Closes scylladb/scylladb#27517 * github.com:scylladb/scylladb: test: add tests for CQL forwarding transport: enable CQL forwarding for strong consistency statements transport: add remote statement preparation for CQL forwarding transport: handle redirect responses in CQL forwarding transport: add exception handling for forwarded CQL requests transport: add basic CQL request forwarding idl: add a representation of client_state for forwarding cql_server: handle query, execute, batch in one case transport: inline process_on_shard in cql_server::process transport: extract process() to cql_server transport: add messaging_service to cql_server transport: add response reconstruction helpers for forwarding transport: generalize the bounce result message for bouncing to other nodes strong consistency: redirect requests to live replicas from the same rack transport: pass foreign_ptr into sleep_until_timeout_passes and move it to cql_server transport: extract the error handling from process_request_one transport: move error response helpers from connection to cql_server
395 lines
17 KiB
C++
395 lines
17 KiB
C++
/*
|
|
* Copyright (C) 2016-present ScyllaDB
|
|
*
|
|
* Modified by ScyllaDB
|
|
*/
|
|
|
|
/*
|
|
* SPDX-License-Identifier: (LicenseRef-ScyllaDB-Source-Available-1.0 and Apache-2.0)
|
|
*/
|
|
|
|
#include "client_state.hh"
|
|
#include "auth/authenticator.hh"
|
|
#include "auth/common.hh"
|
|
#include "auth/resource.hh"
|
|
#include "exceptions/exceptions.hh"
|
|
#include "db/system_keyspace.hh"
|
|
#include "db/schema_tables.hh"
|
|
#include "tracing/trace_keyspace_helper.hh"
|
|
#include "db/system_distributed_keyspace.hh"
|
|
#include "replica/database.hh"
|
|
#include "utils/overloaded_functor.hh"
|
|
#include <seastar/core/coroutine.hh>
|
|
#include "service/paxos/paxos_state.hh"
|
|
|
|
thread_local api::timestamp_type service::client_state::_last_timestamp_micros = 0;
|
|
|
|
void service::client_state::set_login(auth::authenticated_user user) {
|
|
_user = std::move(user);
|
|
}
|
|
|
|
future<> service::client_state::check_user_can_login() {
|
|
if (auth::is_anonymous(*_user)) {
|
|
co_return;
|
|
}
|
|
|
|
auto& role_manager = _auth_service->underlying_role_manager();
|
|
|
|
const bool exists = co_await role_manager.exists(*_user->name);
|
|
|
|
if (!exists) {
|
|
throw exceptions::authentication_exception(
|
|
format("User {} doesn't exist - create it with CREATE USER query first", *_user->name));
|
|
}
|
|
|
|
bool can_login = co_await role_manager.can_login(*_user->name);
|
|
if (!can_login) {
|
|
throw exceptions::authentication_exception(format("{} is not permitted to log in", *_user->name));
|
|
}
|
|
}
|
|
|
|
void service::client_state::validate_login() const {
|
|
if (!_user) {
|
|
throw exceptions::unauthorized_exception("You have not logged in");
|
|
}
|
|
}
|
|
|
|
void service::client_state::ensure_not_anonymous() const {
|
|
if (_bypass_auth_checks) {
|
|
return;
|
|
}
|
|
validate_login();
|
|
if (auth::is_anonymous(*_user)) {
|
|
throw exceptions::unauthorized_exception("You have to be logged in and not anonymous to perform this request");
|
|
}
|
|
}
|
|
|
|
future<bool> service::client_state::has_superuser() const {
|
|
if (_bypass_auth_checks) {
|
|
co_return true;
|
|
}
|
|
if (!_user) {
|
|
co_return false;
|
|
}
|
|
co_return co_await auth::has_superuser(*_auth_service, *_user);
|
|
}
|
|
|
|
future<> service::client_state::has_all_keyspaces_access(
|
|
auth::permission p) const {
|
|
if (_bypass_auth_checks) {
|
|
co_return;
|
|
}
|
|
validate_login();
|
|
|
|
auth::resource r{auth::resource_kind::data};
|
|
co_return co_await ensure_has_permission({p, r});
|
|
}
|
|
|
|
future<> service::client_state::has_keyspace_access(const sstring& ks, auth::permission p) const {
|
|
auth::resource r = auth::make_data_resource(ks);
|
|
co_return co_await has_access(ks, {p, r});
|
|
}
|
|
|
|
future<> service::client_state::has_functions_access(auth::permission p) const {
|
|
auth::resource r = auth::make_functions_resource();
|
|
co_return co_await ensure_has_permission({p, r});
|
|
}
|
|
|
|
future<> service::client_state::has_functions_access(const sstring& ks, auth::permission p) const {
|
|
auth::resource r = auth::make_functions_resource(ks);
|
|
co_return co_await has_access(ks, {p, r});
|
|
}
|
|
|
|
future<> service::client_state::has_function_access(const sstring& ks, const sstring& function_signature, auth::permission p) const {
|
|
auth::resource r = auth::make_functions_resource(ks, function_signature);
|
|
co_return co_await has_access(ks, {p, r});
|
|
}
|
|
|
|
future<> service::client_state::has_column_family_access(const sstring& ks,
|
|
const sstring& cf, auth::permission p, auth::command_desc::type t, std::optional<bool> is_vector_indexed) const {
|
|
auto r = auth::make_data_resource(ks, cf);
|
|
co_return co_await has_access(ks, {p, r, t}, is_vector_indexed);
|
|
}
|
|
|
|
future<> service::client_state::check_internal_table_permissions(std::string_view ks, std::string_view table_name, const auth::command_desc& cmd) const {
|
|
// 1. CDC and $paxos tables are managed internally by Scylla. Users are prohibited
|
|
// from running ALTER or DROP commands on them.
|
|
// 2. Non-superusers are not allowed to access $paxos tables, even if explicit
|
|
// permissions have been granted.
|
|
// Note: This function is on a hot path; coroutines are avoided to reduce allocations.
|
|
|
|
static constexpr auto forbidden_permissions = auth::permission_set::of<
|
|
auth::permission::ALTER, auth::permission::DROP>();
|
|
|
|
if (forbidden_permissions.contains(cmd.permission)) {
|
|
if ((ks == db::system_distributed_keyspace::NAME || ks == db::system_distributed_keyspace::NAME_EVERYWHERE)
|
|
&& (table_name == db::system_distributed_keyspace::CDC_DESC_V2
|
|
|| table_name == db::system_distributed_keyspace::CDC_TOPOLOGY_DESCRIPTION
|
|
|| table_name == db::system_distributed_keyspace::CDC_TIMESTAMPS
|
|
|| table_name == db::system_distributed_keyspace::CDC_GENERATIONS_V2)) {
|
|
return make_exception_future(exceptions::unauthorized_exception(
|
|
format("Cannot {} {}", auth::permissions::to_string(cmd.permission), cmd.resource)));
|
|
}
|
|
}
|
|
|
|
if (service::paxos::paxos_store::try_get_base_table(table_name)) {
|
|
if (forbidden_permissions.contains(cmd.permission)) {
|
|
return make_exception_future(exceptions::unauthorized_exception(
|
|
format("Cannot {} {}", auth::permissions::to_string(cmd.permission), cmd.resource)));
|
|
}
|
|
|
|
return _auth_service->underlying_role_manager().is_superuser(*_user->name)
|
|
.then([&cmd](bool is_superuser) {
|
|
return is_superuser
|
|
? make_ready_future<>()
|
|
: make_exception_future(exceptions::unauthorized_exception(
|
|
format("Only superusers are allowed to {} {}",
|
|
auth::permissions::to_string(cmd.permission), cmd.resource)));
|
|
});
|
|
}
|
|
|
|
return make_ready_future<>();
|
|
}
|
|
|
|
future<> service::client_state::has_access(const sstring& ks, auth::command_desc cmd, std::optional<bool> is_vector_indexed) const {
|
|
if (ks.empty()) {
|
|
throw exceptions::invalid_request_exception("You have not set a keyspace for this session");
|
|
}
|
|
if (_bypass_auth_checks) {
|
|
co_return;
|
|
}
|
|
|
|
validate_login();
|
|
|
|
static const auto alteration_permissions = auth::permission_set::of<
|
|
auth::permission::CREATE, auth::permission::ALTER, auth::permission::DROP>();
|
|
|
|
// we only care about schema modification.
|
|
if (alteration_permissions.contains(cmd.permission)) {
|
|
// prevent system keyspace modification
|
|
auto name = ks;
|
|
std::transform(name.begin(), name.end(), name.begin(), ::tolower);
|
|
if (is_system_keyspace(name) && cmd.type_ != auth::command_desc::type::ALTER_SYSTEM_WITH_ALLOWED_OPTS) {
|
|
throw exceptions::unauthorized_exception(ks + " keyspace is not user-modifiable.");
|
|
}
|
|
|
|
//
|
|
// we want to disallow dropping any contents of TRACING_KS and disallow dropping the `auth::meta::legacy::AUTH_KS`
|
|
// keyspace.
|
|
//
|
|
|
|
const bool dropping_anything_in_tracing = (name == tracing::trace_keyspace_helper::KEYSPACE_NAME)
|
|
&& (cmd.permission == auth::permission::DROP);
|
|
|
|
const bool dropping_auth_keyspace = (cmd.resource == auth::make_data_resource(auth::meta::legacy::AUTH_KS))
|
|
&& (cmd.permission == auth::permission::DROP);
|
|
|
|
if (dropping_anything_in_tracing || dropping_auth_keyspace) {
|
|
throw exceptions::unauthorized_exception(
|
|
format("Cannot {} {}", auth::permissions::to_string(cmd.permission), cmd.resource));
|
|
}
|
|
}
|
|
|
|
static thread_local std::unordered_set<auth::resource> readable_system_resources = [] {
|
|
std::unordered_set<auth::resource> tmp;
|
|
for (auto cf : { db::system_keyspace::LOCAL, db::system_keyspace::PEERS }) {
|
|
tmp.insert(auth::make_data_resource(db::system_keyspace::NAME, cf));
|
|
}
|
|
for (const auto& cf : db::schema_tables::all_table_infos(db::schema_features::full())) {
|
|
tmp.insert(auth::make_data_resource(db::schema_tables::NAME, cf.name));
|
|
}
|
|
return tmp;
|
|
}();
|
|
|
|
if (cmd.permission == auth::permission::SELECT && readable_system_resources.contains(cmd.resource)) {
|
|
co_return;
|
|
}
|
|
if (alteration_permissions.contains(cmd.permission)) {
|
|
if (auth::is_protected(*_auth_service, cmd)) {
|
|
throw exceptions::unauthorized_exception(format("{} is protected", cmd.resource));
|
|
}
|
|
}
|
|
|
|
if (cmd.resource.kind() == auth::resource_kind::data) {
|
|
const auto resource_view = auth::data_resource_view(cmd.resource);
|
|
if (resource_view.table()) {
|
|
co_await check_internal_table_permissions(ks, *resource_view.table(), cmd);
|
|
}
|
|
}
|
|
|
|
if (cmd.resource.kind() == auth::resource_kind::data
|
|
&& !(cmd.permission == auth::permission::SELECT || cmd.permission == auth::permission::DESCRIBE)
|
|
&& is_system_keyspace(ks)
|
|
&& _user
|
|
&& !auth::is_anonymous(*_user)
|
|
&& !co_await _auth_service->underlying_role_manager().is_superuser(*_user->name)) [[unlikely]] {
|
|
throw exceptions::unauthorized_exception(
|
|
ks + " can be granted only SELECT or DESCRIBE permissions to a non-superuser.");
|
|
}
|
|
|
|
static const std::unordered_set<auth::resource> vector_search_system_resources = {
|
|
auth::make_data_resource(db::system_keyspace::NAME, db::system_keyspace::GROUP0_HISTORY),
|
|
auth::make_data_resource(db::system_keyspace::NAME, db::system_keyspace::VERSIONS),
|
|
auth::make_data_resource(db::system_keyspace::NAME, db::system_keyspace::CDC_STREAMS),
|
|
auth::make_data_resource(db::system_keyspace::NAME, db::system_keyspace::CDC_TIMESTAMPS),
|
|
};
|
|
|
|
if ((cmd.resource.kind() == auth::resource_kind::data && cmd.permission == auth::permission::SELECT && is_vector_indexed.has_value() && is_vector_indexed.value()) ||
|
|
(cmd.permission == auth::permission::SELECT && vector_search_system_resources.contains(cmd.resource))) {
|
|
|
|
co_return co_await ensure_has_permission<auth::command_desc_with_permission_set>({auth::permission_set::of<auth::permission::SELECT, auth::permission::VECTOR_SEARCH_INDEXING>(), cmd.resource});
|
|
|
|
}
|
|
|
|
co_return co_await ensure_has_permission(cmd);
|
|
}
|
|
|
|
static bool intersects_permissions(const auth::permission_set& permissions, const auth::command_desc_with_permission_set& cmd) {
|
|
return permissions.intersects(cmd.permission);
|
|
}
|
|
|
|
static bool intersects_permissions(const auth::permission_set& permissions, const auth::command_desc& cmd) {
|
|
return permissions.contains(cmd.permission);
|
|
}
|
|
|
|
sstring service::client_state::generate_authorization_error_msg(const auth::command_desc& cmd) const {
|
|
return format("User {} has no {} permission on {} or any of its parents",
|
|
*_user,
|
|
auth::permissions::to_string(cmd.permission),
|
|
cmd.resource);
|
|
}
|
|
|
|
sstring service::client_state::generate_authorization_error_msg(const auth::command_desc_with_permission_set& cmd) const {
|
|
sstring perm_names = fmt::format("{}", fmt::join(auth::permissions::to_strings(cmd.permission), ", "));
|
|
return format("User {} has none of the permissions ({}) on {} or any of its parents",
|
|
*_user,
|
|
perm_names,
|
|
cmd.resource);
|
|
}
|
|
|
|
template <typename Cmd>
|
|
future<bool> service::client_state::check_has_permission(Cmd cmd) const {
|
|
if (_bypass_auth_checks) {
|
|
co_return true;
|
|
}
|
|
|
|
std::optional<auth::resource> parent_r = cmd.resource.parent();
|
|
|
|
auth::permission_set set = co_await auth::get_permissions(*_auth_service, *_user, cmd.resource);
|
|
if (intersects_permissions(set, cmd)) {
|
|
co_return true;
|
|
}
|
|
if (parent_r) {
|
|
co_return co_await check_has_permission<Cmd>({cmd.permission, *parent_r});
|
|
}
|
|
co_return false;
|
|
}
|
|
template future<bool> service::client_state::check_has_permission(auth::command_desc) const;
|
|
template future<bool> service::client_state::check_has_permission<auth::command_desc_with_permission_set>(auth::command_desc_with_permission_set) const;
|
|
|
|
template <typename Cmd>
|
|
future<> service::client_state::ensure_has_permission(Cmd cmd) const {
|
|
return check_has_permission(cmd).then([this, cmd](bool ok) {
|
|
if (!ok) {
|
|
return make_exception_future<>(exceptions::unauthorized_exception(
|
|
generate_authorization_error_msg(cmd)));
|
|
}
|
|
return make_ready_future<>();
|
|
});
|
|
}
|
|
template future<> service::client_state::ensure_has_permission(auth::command_desc) const;
|
|
template future<> service::client_state::ensure_has_permission<auth::command_desc_with_permission_set>(auth::command_desc_with_permission_set) const;
|
|
|
|
void service::client_state::set_keyspace(replica::database& db, std::string_view keyspace) {
|
|
// Skip keyspace validation for non-authenticated users. Apparently, some client libraries
|
|
// call set_keyspace() before calling login(), and we have to handle that.
|
|
if (_user && !db.has_keyspace(keyspace)) {
|
|
throw exceptions::invalid_request_exception(seastar::format("Keyspace '{}' does not exist", keyspace));
|
|
}
|
|
_keyspace = sstring(keyspace);
|
|
}
|
|
|
|
future<> service::client_state::ensure_exists(const auth::resource& r) const {
|
|
return _auth_service->exists(r).then([&r](bool exists) {
|
|
if (!exists) {
|
|
return make_exception_future<>(exceptions::invalid_request_exception(format("{} doesn't exist.", r)));
|
|
}
|
|
|
|
return make_ready_future<>();
|
|
});
|
|
}
|
|
|
|
void service::client_state::maybe_update_per_service_level_params() {
|
|
if (_sl_controller && _user && _user->name) {
|
|
auto slo_opt = _sl_controller->find_cached_effective_service_level(_user->name.value());
|
|
if (!slo_opt) {
|
|
return;
|
|
}
|
|
update_per_service_level_params(*slo_opt);
|
|
}
|
|
}
|
|
|
|
void service::client_state::update_per_service_level_params(qos::service_level_options& slo) {
|
|
auto slo_timeout_or = [&] (const lowres_clock::duration& default_timeout) {
|
|
return std::visit(overloaded_functor{
|
|
[&] (const qos::service_level_options::unset_marker&) -> lowres_clock::duration {
|
|
return default_timeout;
|
|
},
|
|
[&] (const qos::service_level_options::delete_marker&) -> lowres_clock::duration {
|
|
return default_timeout;
|
|
},
|
|
[&] (const lowres_clock::duration& d) -> lowres_clock::duration {
|
|
return d;
|
|
},
|
|
}, slo.timeout);
|
|
};
|
|
|
|
_timeout_config.read_timeout = slo_timeout_or(_default_timeout_config.read_timeout);
|
|
_timeout_config.write_timeout = slo_timeout_or(_default_timeout_config.write_timeout);
|
|
_timeout_config.range_read_timeout = slo_timeout_or(_default_timeout_config.range_read_timeout);
|
|
_timeout_config.counter_write_timeout = slo_timeout_or(_default_timeout_config.counter_write_timeout);
|
|
_timeout_config.truncate_timeout = slo_timeout_or(_default_timeout_config.truncate_timeout);
|
|
_timeout_config.cas_timeout = slo_timeout_or(_default_timeout_config.cas_timeout);
|
|
_timeout_config.other_timeout = slo_timeout_or(_default_timeout_config.other_timeout);
|
|
|
|
_workload_type = slo.workload;
|
|
}
|
|
|
|
future<> service::client_state::set_client_options(
|
|
client_options_cache_type& keys_and_values_cache,
|
|
const std::unordered_map<sstring, sstring>& client_options) {
|
|
for (const auto& [key, value] : client_options) {
|
|
auto cached_key = co_await keys_and_values_cache.get_or_load(key, [] (const client_options_cache_key_type&) {
|
|
return make_ready_future<options_cache_value_type>(options_cache_value_type{});
|
|
});
|
|
auto cached_value = co_await keys_and_values_cache.get_or_load(value, [] (const client_options_cache_key_type&) {
|
|
return make_ready_future<options_cache_value_type>(options_cache_value_type{});
|
|
});
|
|
_client_options.emplace_back(std::move(cached_key), std::move(cached_value));
|
|
}
|
|
}
|
|
|
|
service::forwarded_client_state::forwarded_client_state(
|
|
sstring keyspace,
|
|
std::optional<sstring> username,
|
|
::timeout_config timeout_config,
|
|
uint64_t protocol_extensions_mask,
|
|
gms::inet_address remote_address,
|
|
uint16_t remote_port)
|
|
: keyspace(std::move(keyspace))
|
|
, username(std::move(username))
|
|
, timeout_config(std::move(timeout_config))
|
|
, protocol_extensions_mask(protocol_extensions_mask)
|
|
, remote_address(std::move(remote_address))
|
|
, remote_port(remote_port)
|
|
{ }
|
|
|
|
service::forwarded_client_state::forwarded_client_state(const client_state& cs)
|
|
: keyspace(cs.get_raw_keyspace())
|
|
, username(cs.user() ? std::optional<sstring>{cs.user()->name} : std::nullopt)
|
|
, timeout_config(cs.get_timeout_config())
|
|
, protocol_extensions_mask(static_cast<uint64_t>(cs.get_protocol_extensions().mask()))
|
|
, remote_address(cs.get_client_address())
|
|
, remote_port(cs.get_client_port())
|
|
{ }
|