In this series we add support for forwarding strongly consistent CQL requests to suitable replicas, so that clients can issue reads/writes to any node and have the request executed on an appropriate tablet replica (and, for writes, on the Raft leader). We return the same CQL response as what the user would get while sending the request to the correct replica and we perform the same logging/stats updates on the request coordinator as if the coordinator was the appropriate replica. The core mechanism of forwarding a strongly consistent request is sending an RPC containing the user's cql request frame to the appropriate replica and returning back a ready, serialized `cql_transport::response`. We do this in the CQL server - it is most prepared for handling these types and forwarding a request containing a CQL frame allows us to reuse near-top-level methods for CQL request handling in the new RPC handler (such as the general `process`) For sending the RPC, the CQL server needs to obtain the information about who should it forward the request to. This requires knowledge about the tablet raft group members and leader. We obtain this information during the execution of a `cql3/strong_consistency` statement, and we return this information back to the CQL server using the generalized `bounce_to_shard` `response_message`, where we now store the information about either a shard, or a specific replica to which we should forward to. Similarly to `bounce_to_shard`, we need to handle this `result_message` in a loop - a replica may move during statement execution, or the Raft leader can change. We also use it for forwarding strongly consistent writes when we're not a member of the affected tablet raft group - in that case we need to forward the statement twice - once to any replica of the affected tablet, then that replica can find the leader and return this information to the coordinator, which allows the second request to be directed to the leader. This feature also allows passing through exception messages which happened on the target replica while executing the statement. For that, many methods of the `cql_transport::cql_server::connection` for creating error responses needed to be moved to `cql_transport::cql_server`. And for final exception handling on the coordinator, we added additional error info to the RPC response, so that the handling can be performed without having the `result_message::exception` or `exception_ptr` itself. Fixes [SCYLLADB-71](https://scylladb.atlassian.net/browse/SCYLLADB-71) [SCYLLADB-71]: https://scylladb.atlassian.net/browse/SCYLLADB-71?atlOrigin=eyJpIjoiNWRkNTljNzYxNjVmNDY3MDlhMDU5Y2ZhYzA5YTRkZjUiLCJwIjoiZ2l0aHViLWNvbS1KU1cifQ Closes scylladb/scylladb#27517 * github.com:scylladb/scylladb: test: add tests for CQL forwarding transport: enable CQL forwarding for strong consistency statements transport: add remote statement preparation for CQL forwarding transport: handle redirect responses in CQL forwarding transport: add exception handling for forwarded CQL requests transport: add basic CQL request forwarding idl: add a representation of client_state for forwarding cql_server: handle query, execute, batch in one case transport: inline process_on_shard in cql_server::process transport: extract process() to cql_server transport: add messaging_service to cql_server transport: add response reconstruction helpers for forwarding transport: generalize the bounce result message for bouncing to other nodes strong consistency: redirect requests to live replicas from the same rack transport: pass foreign_ptr into sleep_until_timeout_passes and move it to cql_server transport: extract the error handling from process_request_one transport: move error response helpers from connection to cql_server
503 lines
20 KiB
C++
503 lines
20 KiB
C++
/*
|
|
* Copyright (C) 2015-present ScyllaDB
|
|
*
|
|
* Modified by ScyllaDB
|
|
*/
|
|
|
|
/*
|
|
* SPDX-License-Identifier: (LicenseRef-ScyllaDB-Source-Available-1.0 and Apache-2.0)
|
|
*/
|
|
|
|
#pragma once
|
|
|
|
#include "auth/service.hh"
|
|
#include "exceptions/exceptions.hh"
|
|
#include "timeout_config.hh"
|
|
#include "mutation/timestamp.hh"
|
|
#include "replica/database_fwd.hh"
|
|
#include "auth/authenticated_user.hh"
|
|
#include "auth/authenticator.hh"
|
|
#include "auth/permission.hh"
|
|
#include "client_data.hh"
|
|
|
|
#include "transport/cql_protocol_extension.hh"
|
|
#include "service/qos/service_level_controller.hh"
|
|
|
|
namespace auth {
|
|
class resource;
|
|
}
|
|
|
|
namespace data_dictionary {
|
|
class database;
|
|
}
|
|
|
|
namespace service {
|
|
|
|
class client_state;
|
|
struct forwarded_client_state {
|
|
sstring keyspace;
|
|
std::optional<sstring> username;
|
|
timeout_config timeout_config;
|
|
uint64_t protocol_extensions_mask;
|
|
gms::inet_address remote_address;
|
|
uint16_t remote_port;
|
|
|
|
forwarded_client_state(sstring keyspace,
|
|
std::optional<sstring> username,
|
|
::timeout_config timeout_config,
|
|
uint64_t protocol_extensions_mask,
|
|
gms::inet_address remote_address,
|
|
uint16_t remote_port);
|
|
forwarded_client_state(const client_state& cs);
|
|
};
|
|
|
|
/**
|
|
* State related to a client connection.
|
|
*/
|
|
class client_state {
|
|
public:
|
|
enum class auth_state : uint8_t {
|
|
UNINITIALIZED, AUTHENTICATION, READY
|
|
};
|
|
using workload_type = qos::service_level_options::workload_type;
|
|
|
|
// This class is used to move client_state between shards
|
|
// It is created on a shard that owns client_state than passed
|
|
// to a target shard where client_state_for_another_shard::get()
|
|
// can be called to obtain a shard local copy.
|
|
class client_state_for_another_shard {
|
|
private:
|
|
const client_state* _cs;
|
|
seastar::sharded<auth::service>* _auth_service;
|
|
seastar::sharded<qos::service_level_controller>* _sl_controller;
|
|
client_state_for_another_shard(const client_state* cs,
|
|
seastar::sharded<auth::service>* auth_service,
|
|
seastar::sharded<qos::service_level_controller>* sl_controller)
|
|
: _cs(cs), _auth_service(auth_service), _sl_controller(sl_controller) {}
|
|
friend client_state;
|
|
public:
|
|
client_state get() const {
|
|
return client_state(_cs, _auth_service, _sl_controller);
|
|
}
|
|
};
|
|
private:
|
|
client_state(const client_state* cs,
|
|
seastar::sharded<auth::service>* auth_service,
|
|
seastar::sharded<qos::service_level_controller>* sl_controller)
|
|
: _keyspace(cs->_keyspace)
|
|
, _user(cs->_user)
|
|
, _auth_state(cs->_auth_state)
|
|
, _is_internal(cs->_is_internal)
|
|
, _bypass_auth_checks(cs->_bypass_auth_checks)
|
|
, _remote_address(cs->_remote_address)
|
|
, _auth_service(auth_service ? &auth_service->local() : nullptr)
|
|
, _sl_controller(sl_controller ? &sl_controller->local() : nullptr)
|
|
, _default_timeout_config(cs->_default_timeout_config)
|
|
, _timeout_config(cs->_timeout_config)
|
|
, _enabled_protocol_extensions(cs->_enabled_protocol_extensions)
|
|
{}
|
|
friend client_state_for_another_shard;
|
|
private:
|
|
sstring _keyspace;
|
|
#if 0
|
|
private static final Logger logger = LoggerFactory.getLogger(ClientState.class);
|
|
public static final SemanticVersion DEFAULT_CQL_VERSION = org.apache.cassandra.cql3.QueryProcessor.CQL_VERSION;
|
|
|
|
private static final Set<IResource> READABLE_SYSTEM_RESOURCES = new HashSet<>();
|
|
private static final Set<IResource> PROTECTED_AUTH_RESOURCES = new HashSet<>();
|
|
|
|
static
|
|
{
|
|
// We want these system cfs to be always readable to authenticated users since many tools rely on them
|
|
// (nodetool, cqlsh, bulkloader, etc.)
|
|
for (String cf : Iterables.concat(Arrays.asList(SystemKeyspace.LOCAL, SystemKeyspace.PEERS), LegacySchemaTables.ALL))
|
|
READABLE_SYSTEM_RESOURCES.add(DataResource.columnFamily(SystemKeyspace.NAME, cf));
|
|
|
|
PROTECTED_AUTH_RESOURCES.addAll(DatabaseDescriptor.getAuthenticator().protectedResources());
|
|
PROTECTED_AUTH_RESOURCES.addAll(DatabaseDescriptor.getAuthorizer().protectedResources());
|
|
}
|
|
|
|
// Current user for the session
|
|
private volatile AuthenticatedUser user;
|
|
private volatile String keyspace;
|
|
#endif
|
|
std::optional<auth::authenticated_user> _user;
|
|
std::optional<client_options_cache_entry_type> _driver_name, _driver_version;
|
|
std::list<client_option_key_value_cached_entry> _client_options;
|
|
|
|
auth_state _auth_state = auth_state::UNINITIALIZED;
|
|
bool _control_connection = false;
|
|
|
|
// isInternal is used to mark ClientState as used by some internal component
|
|
// that should have an ability to modify system keyspace.
|
|
bool _is_internal;
|
|
|
|
// bypass_auth_checks is used to skip authorization checks.
|
|
// This is used by the maintenance socket to allow privileged access without
|
|
// going through normal auth, while still treating queries as external.
|
|
bool _bypass_auth_checks;
|
|
|
|
// The biggest timestamp that was returned by getTimestamp/assigned to a query
|
|
static thread_local api::timestamp_type _last_timestamp_micros;
|
|
|
|
// Address of a client
|
|
socket_address _remote_address;
|
|
|
|
// Only populated for external client state.
|
|
auth::service* _auth_service{nullptr};
|
|
qos::service_level_controller* _sl_controller{nullptr};
|
|
|
|
// For restoring default values in the timeout config
|
|
timeout_config _default_timeout_config;
|
|
timeout_config _timeout_config;
|
|
|
|
workload_type _workload_type = workload_type::unspecified;
|
|
|
|
public:
|
|
struct internal_tag {};
|
|
struct external_tag {};
|
|
|
|
workload_type get_workload_type() const noexcept {
|
|
return _workload_type;
|
|
}
|
|
|
|
auth_state get_auth_state() const noexcept {
|
|
return _auth_state;
|
|
}
|
|
|
|
void set_auth_state(auth_state new_state) noexcept {
|
|
_auth_state = new_state;
|
|
}
|
|
|
|
bool is_control_connection() const noexcept {
|
|
return _control_connection;
|
|
}
|
|
|
|
bool set_control_connection() noexcept {
|
|
return _control_connection = true;
|
|
}
|
|
|
|
std::optional<client_options_cache_entry_type> get_driver_name() const {
|
|
return _driver_name;
|
|
}
|
|
future<> set_driver_name(client_options_cache_type& keys_and_values_cache, const sstring& driver_name) {
|
|
_driver_name = co_await keys_and_values_cache.get_or_load(driver_name, [] (const client_options_cache_key_type&) {
|
|
return make_ready_future<options_cache_value_type>(options_cache_value_type{});
|
|
});
|
|
}
|
|
|
|
const auto& get_client_options() const {
|
|
return _client_options;
|
|
}
|
|
|
|
future<> set_client_options(
|
|
client_options_cache_type& keys_and_values_cache,
|
|
const std::unordered_map<sstring, sstring>& client_options);
|
|
|
|
std::optional<client_options_cache_entry_type> get_driver_version() const {
|
|
return _driver_version;
|
|
}
|
|
future<> set_driver_version(
|
|
client_options_cache_type& keys_and_values_cache,
|
|
const sstring& driver_version)
|
|
{
|
|
_driver_version = co_await keys_and_values_cache.get_or_load(driver_version, [] (const client_options_cache_key_type&) {
|
|
return make_ready_future<options_cache_value_type>(options_cache_value_type{});
|
|
});
|
|
}
|
|
|
|
client_state(external_tag,
|
|
auth::service& auth_service,
|
|
qos::service_level_controller* sl_controller,
|
|
timeout_config timeout_config,
|
|
const socket_address& remote_address = socket_address(),
|
|
bool bypass_auth_checks = false)
|
|
: _is_internal(false)
|
|
, _bypass_auth_checks(bypass_auth_checks)
|
|
, _remote_address(remote_address)
|
|
, _auth_service(&auth_service)
|
|
, _sl_controller(sl_controller)
|
|
, _default_timeout_config(timeout_config)
|
|
, _timeout_config(timeout_config) {
|
|
if (!auth_service.underlying_authenticator().require_authentication()) {
|
|
_user = auth::authenticated_user();
|
|
}
|
|
}
|
|
|
|
gms::inet_address get_client_address() const {
|
|
return gms::inet_address(_remote_address);
|
|
}
|
|
|
|
::in_port_t get_client_port() const {
|
|
return _remote_address.port();
|
|
}
|
|
|
|
const socket_address& get_remote_address() const {
|
|
return _remote_address;
|
|
}
|
|
|
|
const timeout_config& get_timeout_config() const {
|
|
return _timeout_config;
|
|
}
|
|
|
|
qos::service_level_controller& get_service_level_controller() const {
|
|
return *_sl_controller;
|
|
}
|
|
|
|
client_state(internal_tag) : client_state(internal_tag{}, infinite_timeout_config)
|
|
{}
|
|
|
|
client_state(internal_tag, const timeout_config& config)
|
|
: _keyspace("system")
|
|
, _is_internal(true)
|
|
, _bypass_auth_checks(true)
|
|
, _default_timeout_config(config)
|
|
, _timeout_config(config)
|
|
{}
|
|
|
|
client_state(internal_tag, auth::service& auth_service, qos::service_level_controller& sl_controller, sstring username)
|
|
: _user(auth::authenticated_user(username))
|
|
, _auth_state(auth_state::READY)
|
|
, _is_internal(true)
|
|
, _bypass_auth_checks(true)
|
|
, _auth_service(&auth_service)
|
|
, _sl_controller(&sl_controller)
|
|
{}
|
|
|
|
client_state(auth::service& auth_service,
|
|
qos::service_level_controller* sl_controller,
|
|
forwarded_client_state&& forwarded_state)
|
|
: _keyspace(std::move(forwarded_state.keyspace))
|
|
, _user(forwarded_state.username ? auth::authenticated_user(*forwarded_state.username) : auth::authenticated_user{})
|
|
, _auth_state(auth_state::READY)
|
|
, _is_internal(false)
|
|
, _bypass_auth_checks(false)
|
|
, _remote_address(socket_address(forwarded_state.remote_address, forwarded_state.remote_port))
|
|
, _auth_service(&auth_service)
|
|
, _sl_controller(sl_controller)
|
|
, _default_timeout_config(forwarded_state.timeout_config)
|
|
, _timeout_config(std::move(forwarded_state.timeout_config))
|
|
, _enabled_protocol_extensions(cql_transport::cql_protocol_extension_enum_set::from_mask(
|
|
forwarded_state.protocol_extensions_mask))
|
|
{}
|
|
|
|
client_state(const client_state&) = delete;
|
|
client_state(client_state&&) = default;
|
|
|
|
///
|
|
/// `nullptr` for internal instances.
|
|
///
|
|
auth::service* get_auth_service() const {
|
|
return _auth_service;
|
|
}
|
|
|
|
bool is_internal() const {
|
|
return _is_internal;
|
|
}
|
|
|
|
/**
|
|
* @return a ClientState object for internal C* calls (not limited by any kind of auth).
|
|
*/
|
|
static client_state& for_internal_calls() {
|
|
static thread_local client_state s(internal_tag{});
|
|
return s;
|
|
}
|
|
|
|
/**
|
|
* This clock guarantees that updates for the same ClientState will be ordered
|
|
* in the sequence seen, even if multiple updates happen in the same millisecond.
|
|
*/
|
|
api::timestamp_type get_timestamp() {
|
|
auto current = api::new_timestamp();
|
|
auto last = _last_timestamp_micros;
|
|
auto result = last >= current ? last + 1 : current;
|
|
_last_timestamp_micros = result;
|
|
return result;
|
|
}
|
|
|
|
/**
|
|
* Returns a timestamp suitable for paxos given the timestamp of the last known commit (or in progress update).
|
|
*
|
|
* Paxos ensures that the timestamp it uses for commits respects the serial order of those commits. It does so
|
|
* by having each replica reject any proposal whose timestamp is not strictly greater than the last proposal it
|
|
* accepted. So in practice, which timestamp we use for a given proposal doesn't affect correctness but it does
|
|
* affect the chance of making progress (if we pick a timestamp lower than what has been proposed before, our
|
|
* new proposal will just get rejected).
|
|
*
|
|
* As during the prepared phase replica send us the last propose they accepted, a first option would be to take
|
|
* the maximum of those last accepted proposal timestamp plus 1 (and use a default value, say 0, if it's the
|
|
* first known proposal for the partition). This would mostly work (giving commits the timestamp 0, 1, 2, ...
|
|
* in the order they are committed) but with 2 important caveats:
|
|
* 1) it would give a very poor experience when Paxos and non-Paxos updates are mixed in the same partition,
|
|
* since paxos operations wouldn't be using microseconds timestamps. And while you shouldn't theoretically
|
|
* mix the 2 kind of operations, this would still be pretty nonintuitive. And what if you started writing
|
|
* normal updates and realize later you should switch to Paxos to enforce a property you want?
|
|
* 2) this wouldn't actually be safe due to the expiration set on the Paxos state table.
|
|
*
|
|
* So instead, we initially chose to use the current time in microseconds as for normal update. Which works in
|
|
* general but mean that clock skew creates unavailability periods for Paxos updates (either a node has his clock
|
|
* in the past and he may no be able to get commit accepted until its clock catch up, or a node has his clock in
|
|
* the future and then once one of its commit his accepted, other nodes ones won't be until they catch up). This
|
|
* is ok for small clock skew (few ms) but can be pretty bad for large one.
|
|
*
|
|
* Hence our current solution: we mix both approaches. That is, we compare the timestamp of the last known
|
|
* accepted proposal and the local time. If the local time is greater, we use it, thus keeping paxos timestamps
|
|
* locked to the current time in general (making mixing Paxos and non-Paxos more friendly, and behaving correctly
|
|
* when the paxos state expire (as long as your maximum clock skew is lower than the Paxos state expiration
|
|
* time)). Otherwise (the local time is lower than the last proposal, meaning that this last proposal was done
|
|
* with a clock in the future compared to the local one), we use the last proposal timestamp plus 1, ensuring
|
|
* progress.
|
|
*
|
|
* @param min_timestamp_to_use the max timestamp of the last proposal accepted by replica having responded
|
|
* to the prepare phase of the paxos round this is for. In practice, that's the minimum timestamp this method
|
|
* may return.
|
|
* @return a timestamp suitable for a Paxos proposal (using the reasoning described above). Note that
|
|
* contrary to the get_timestamp() method, the return value is not guaranteed to be unique (nor
|
|
* monotonic) across calls since it can return it's argument (so if the same argument is passed multiple times,
|
|
* it may be returned multiple times). Note that we still ensure Paxos "ballot" are unique (for different
|
|
* proposal) by (securely) randomizing the non-timestamp part of the UUID.
|
|
*/
|
|
api::timestamp_type get_timestamp_for_paxos(api::timestamp_type min_timestamp_to_use) {
|
|
api::timestamp_type current = std::max(api::new_timestamp(), min_timestamp_to_use);
|
|
_last_timestamp_micros = _last_timestamp_micros >= current ? _last_timestamp_micros + 1 : current;
|
|
return _last_timestamp_micros;
|
|
}
|
|
|
|
#if 0
|
|
public SocketAddress getRemoteAddress()
|
|
{
|
|
return remoteAddress;
|
|
}
|
|
#endif
|
|
|
|
const sstring& get_raw_keyspace() const noexcept {
|
|
return _keyspace;
|
|
}
|
|
|
|
sstring& get_raw_keyspace() noexcept {
|
|
return _keyspace;
|
|
}
|
|
|
|
public:
|
|
void set_keyspace(replica::database& db, std::string_view keyspace);
|
|
|
|
void set_raw_keyspace(sstring new_keyspace) noexcept {
|
|
_keyspace = std::move(new_keyspace);
|
|
}
|
|
|
|
const sstring& get_keyspace() const {
|
|
if (_keyspace.empty()) {
|
|
throw exceptions::invalid_request_exception("No keyspace has been specified. USE a keyspace, or explicitly specify keyspace.tablename");
|
|
}
|
|
return _keyspace;
|
|
}
|
|
|
|
/**
|
|
* Sets active user. Does _not_ validate anything
|
|
*/
|
|
void set_login(auth::authenticated_user);
|
|
|
|
/// \brief A user can login if it's anonymous, or if it exists and the `LOGIN` option for the user is `true`.
|
|
future<> check_user_can_login();
|
|
|
|
future<> has_all_keyspaces_access(auth::permission) const;
|
|
future<> has_keyspace_access(const sstring&, auth::permission) const;
|
|
future<> has_column_family_access(const sstring&, const sstring&, auth::permission,
|
|
auth::command_desc::type = auth::command_desc::type::OTHER, std::optional<bool> is_vector_indexed = std::nullopt) const;
|
|
|
|
future<> has_functions_access(auth::permission p) const;
|
|
future<> has_functions_access(const sstring& ks, auth::permission p) const;
|
|
future<> has_function_access(const sstring& ks, const sstring& function_signature, auth::permission p) const;
|
|
private:
|
|
future<> check_internal_table_permissions(std::string_view ks, std::string_view table_name, const auth::command_desc& cmd) const;
|
|
future<> has_access(const sstring& keyspace, auth::command_desc, std::optional<bool> is_vector_indexed = std::nullopt) const;
|
|
sstring generate_authorization_error_msg(const auth::command_desc&) const;
|
|
sstring generate_authorization_error_msg(const auth::command_desc_with_permission_set&) const;
|
|
|
|
public:
|
|
template<typename Cmd = auth::command_desc> future<bool> check_has_permission(Cmd) const;
|
|
template<typename Cmd = auth::command_desc> future<> ensure_has_permission(Cmd) const;
|
|
void maybe_update_per_service_level_params();
|
|
void update_per_service_level_params(qos::service_level_options& slo);
|
|
|
|
/**
|
|
* Returns an exceptional future with \ref exceptions::invalid_request_exception if the resource does not exist.
|
|
*/
|
|
future<> ensure_exists(const auth::resource&) const;
|
|
|
|
void validate_login() const;
|
|
void ensure_not_anonymous() const; // unauthorized_exception on error
|
|
|
|
/// Returns true if the user has superuser privileges.
|
|
/// Internal clients are always considered superusers.
|
|
future<bool> has_superuser() const;
|
|
|
|
#if 0
|
|
public void ensureIsSuper(String message) throws UnauthorizedException
|
|
{
|
|
if (DatabaseDescriptor.getAuthenticator().requireAuthentication() && (user == null || !user.isSuper()))
|
|
throw new UnauthorizedException(message);
|
|
}
|
|
|
|
private static void validateKeyspace(String keyspace) throws InvalidRequestException
|
|
{
|
|
if (keyspace == null)
|
|
throw new InvalidRequestException("You have not set a keyspace for this session");
|
|
}
|
|
#endif
|
|
|
|
const std::optional<auth::authenticated_user>& user() const {
|
|
return _user;
|
|
}
|
|
|
|
client_state_for_another_shard move_to_other_shard() {
|
|
return client_state_for_another_shard(this,
|
|
_auth_service ? &_auth_service->container() : nullptr,
|
|
_sl_controller ? &_sl_controller->container() : nullptr);
|
|
}
|
|
|
|
#if 0
|
|
public static SemanticVersion[] getCQLSupportedVersion()
|
|
{
|
|
return new SemanticVersion[]{ QueryProcessor.CQL_VERSION };
|
|
}
|
|
|
|
private Set<Permission> authorize(IResource resource)
|
|
{
|
|
// AllowAllAuthorizer or manually disabled caching.
|
|
if (Auth.permissionsCache == null)
|
|
return DatabaseDescriptor.getAuthorizer().authorize(user, resource);
|
|
|
|
try
|
|
{
|
|
return Auth.permissionsCache.get(Pair.create(user, resource));
|
|
}
|
|
catch (ExecutionException e)
|
|
{
|
|
throw new RuntimeException(e);
|
|
}
|
|
}
|
|
#endif
|
|
|
|
private:
|
|
|
|
cql_transport::cql_protocol_extension_enum_set _enabled_protocol_extensions;
|
|
|
|
public:
|
|
|
|
bool is_protocol_extension_set(cql_transport::cql_protocol_extension ext) const {
|
|
return _enabled_protocol_extensions.contains(ext);
|
|
}
|
|
|
|
cql_transport::cql_protocol_extension_enum_set get_protocol_extensions() const {
|
|
return _enabled_protocol_extensions;
|
|
}
|
|
|
|
void set_protocol_extensions(cql_transport::cql_protocol_extension_enum_set exts) {
|
|
_enabled_protocol_extensions = std::move(exts);
|
|
}
|
|
};
|
|
|
|
}
|
|
|