Merge 'Coroutinize some auth and service levels related functions' from Marcin Maliszkiewicz

Coroutinization will help improve readability and allow easier changes planned for this code.

This work was separated from https://github.com/scylladb/scylladb/pull/17910 to make it smoother to review and merge.

Closes scylladb/scylladb#18788

* github.com:scylladb/scylladb:
  cql3: coroutinize create/alter/drop service levels
  auth: coroutinize alter_role and drop_role
  auth: coroutinize grant_permissions and revoke_permissions
  auth: coroutinize create_role
  cql3: statements: co-routinize auth related statements
  cql3: statements: release unused guard explicitly in auth related statements
This commit is contained in:
Avi Kivity
2024-05-21 17:45:19 +03:00
12 changed files with 176 additions and 184 deletions

View File

@@ -6,6 +6,7 @@
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#include <exception>
#include <seastar/core/coroutine.hh>
#include "auth/resource.hh"
#include "auth/service.hh"
@@ -437,23 +438,23 @@ future<> create_role(
std::string_view name,
const role_config& config,
const authentication_options& options) {
return ser.underlying_role_manager().create(name, config).then([&ser, name, &options] {
if (!auth::any_authentication_options(options)) {
return make_ready_future<>();
}
return futurize_invoke(
&validate_authentication_options_are_supported,
options,
ser.underlying_authenticator().supported_options()).then([&ser, name, &options] {
return ser.underlying_authenticator().create(name, options);
}).handle_exception([&ser, name](std::exception_ptr ep) {
// Roll-back.
return ser.underlying_role_manager().drop(name).then([ep = std::move(ep)] {
std::rethrow_exception(ep);
});
});
});
co_await ser.underlying_role_manager().create(name, config);
if (!auth::any_authentication_options(options)) {
co_return;
}
std::exception_ptr ep;
try {
validate_authentication_options_are_supported(options,
ser.underlying_authenticator().supported_options());
co_await ser.underlying_authenticator().create(name, options);
} catch (...) {
ep = std::current_exception();
}
if (ep) {
// Roll-back.
co_await ser.underlying_role_manager().drop(name);
std::rethrow_exception(std::move(ep));
}
}
future<> alter_role(
@@ -461,36 +462,26 @@ future<> alter_role(
std::string_view name,
const role_config_update& config_update,
const authentication_options& options) {
return ser.underlying_role_manager().alter(name, config_update).then([&ser, name, &options] {
if (!any_authentication_options(options)) {
return make_ready_future<>();
}
return futurize_invoke(
&validate_authentication_options_are_supported,
options,
ser.underlying_authenticator().supported_options()).then([&ser, name, &options] {
return ser.underlying_authenticator().alter(name, options);
});
});
co_await ser.underlying_role_manager().alter(name, config_update);
if (!any_authentication_options(options)) {
co_return;
}
validate_authentication_options_are_supported(options,
ser.underlying_authenticator().supported_options());
co_await ser.underlying_authenticator().alter(name, options);
}
future<> drop_role(const service& ser, std::string_view name) {
return do_with(make_role_resource(name), [&ser, name](const resource& r) {
auto& a = ser.underlying_authorizer();
return when_all_succeed(
a.revoke_all(name),
a.revoke_all(r))
.discard_result()
.handle_exception_type([](const unsupported_authorization_operation&) {
// Nothing.
});
}).then([&ser, name] {
return ser.underlying_authenticator().drop(name);
}).then([&ser, name] {
return ser.underlying_role_manager().drop(name);
});
auto& a = ser.underlying_authorizer();
auto r = make_role_resource(name);
try {
co_await a.revoke_all(name);
co_await a.revoke_all(r);
} catch (const unsupported_authorization_operation&) {
// Nothing.
}
co_await ser.underlying_authenticator().drop(name);
co_await ser.underlying_role_manager().drop(name);
}
future<bool> has_role(const service& ser, std::string_view grantee, std::string_view name) {
@@ -513,9 +504,8 @@ future<> grant_permissions(
std::string_view role_name,
permission_set perms,
const resource& r) {
return validate_role_exists(ser, role_name).then([&ser, role_name, perms, &r] {
return ser.underlying_authorizer().grant(role_name, perms, r);
});
co_await validate_role_exists(ser, role_name);
co_await ser.underlying_authorizer().grant(role_name, perms, r);
}
future<> grant_applicable_permissions(const service& ser, std::string_view role_name, const resource& r) {
@@ -534,9 +524,8 @@ future<> revoke_permissions(
std::string_view role_name,
permission_set perms,
const resource& r) {
return validate_role_exists(ser, role_name).then([&ser, role_name, perms, &r] {
return ser.underlying_authorizer().revoke(role_name, perms, r);
});
co_await validate_role_exists(ser, role_name);
co_await ser.underlying_authorizer().revoke(role_name, perms, r);
}
future<std::vector<permission_details>> list_filtered_permissions(

View File

@@ -39,11 +39,8 @@ alter_service_level_statement::execute(query_processor& qp,
const query_options &, std::optional<service::group0_guard> guard) const {
qos::service_level& sl = state.get_service_level_controller().get_service_level(_service_level);
qos::service_level_options slo = _slo.replace_defaults(sl.slo);
return state.get_service_level_controller().alter_distributed_service_level(_service_level, slo, std::move(guard)).then([] {
using void_result_msg = cql_transport::messages::result_message::void_message;
using result_msg = cql_transport::messages::result_message;
return ::static_pointer_cast<result_msg>(make_shared<void_result_msg>());
});
co_await state.get_service_level_controller().alter_distributed_service_level(_service_level, slo, std::move(guard));
co_return nullptr;
}
}
}

View File

@@ -43,18 +43,20 @@ attach_service_level_statement::execute(query_processor& qp,
service::query_state &state,
const query_options &,
std::optional<service::group0_guard> guard) const {
return state.get_service_level_controller().get_distributed_service_level(_service_level).then([this] (qos::service_levels_info sli) {
if (sli.empty()) {
throw qos::nonexistant_service_level_exception(_service_level);
}
}).then([&state, this] () {
return state.get_client_state().get_auth_service()->underlying_role_manager().set_attribute(_role_name, "service_level", _service_level).then([] {
using void_result_msg = cql_transport::messages::result_message::void_message;
using result_msg = cql_transport::messages::result_message;
return ::static_pointer_cast<result_msg>(make_shared<void_result_msg>());
});
});
if (guard) {
release_guard(std::move(*guard));
}
auto sli = co_await state.get_service_level_controller().get_distributed_service_level(_service_level);
if (sli.empty()) {
throw qos::nonexistant_service_level_exception(_service_level);
}
co_await state.get_client_state().get_auth_service()->underlying_role_manager().
set_attribute(_role_name, "service_level", _service_level);
using void_result_msg = cql_transport::messages::result_message::void_message;
using result_msg = cql_transport::messages::result_message;
co_return ::static_pointer_cast<result_msg>(make_shared<void_result_msg>());
}
}
}

View File

@@ -138,14 +138,15 @@ cql3::statements::create_keyspace_statement::prepare(data_dictionary::database d
}
future<> cql3::statements::create_keyspace_statement::grant_permissions_to_creator(const service::client_state& cs) const {
return do_with(auth::make_data_resource(keyspace()), [&cs](const auth::resource& r) {
return auth::grant_applicable_permissions(
auto resource = auth::make_data_resource(keyspace());
try {
co_await auth::grant_applicable_permissions(
*cs.get_auth_service(),
*cs.user(),
r).handle_exception_type([](const auth::unsupported_authorization_operation&) {
// Nothing.
});
});
resource);
} catch (const auth::unsupported_authorization_operation&) {
// Nothing.
}
}
// Check for replication strategy choices which are restricted by the

View File

@@ -39,11 +39,8 @@ create_service_level_statement::execute(query_processor& qp,
const query_options &,
std::optional<service::group0_guard> guard) const {
qos::service_level_options slo = _slo.replace_defaults(qos::service_level_options{});
return state.get_service_level_controller().add_distributed_service_level(_service_level, slo, _if_not_exists, std::move(guard)).then([] {
using void_result_msg = cql_transport::messages::result_message::void_message;
using result_msg = cql_transport::messages::result_message;
return ::static_pointer_cast<result_msg>(make_shared<void_result_msg>());
});
co_await state.get_service_level_controller().add_distributed_service_level(_service_level, slo, _if_not_exists, std::move(guard));
co_return nullptr;
}
}
}

View File

@@ -143,14 +143,15 @@ create_table_statement::prepare(data_dictionary::database db, cql_stats& stats)
}
future<> create_table_statement::grant_permissions_to_creator(const service::client_state& cs) const {
return do_with(auth::make_data_resource(keyspace(), column_family()), [&cs](const auth::resource& r) {
return auth::grant_applicable_permissions(
auto resource = auth::make_data_resource(keyspace(), column_family());
try {
co_await auth::grant_applicable_permissions(
*cs.get_auth_service(),
*cs.user(),
r).handle_exception_type([](const auth::unsupported_authorization_operation&) {
// Nothing.
});
});
resource);
} catch (const auth::unsupported_authorization_operation&) {
// Nothing.
}
}
create_table_statement::raw_statement::raw_statement(cf_name name, bool if_not_exists)

View File

@@ -41,11 +41,15 @@ detach_service_level_statement::execute(query_processor& qp,
service::query_state &state,
const query_options &,
std::optional<service::group0_guard> guard) const {
return state.get_client_state().get_auth_service()->underlying_role_manager().remove_attribute(_role_name, "service_level").then([] {
using void_result_msg = cql_transport::messages::result_message::void_message;
using result_msg = cql_transport::messages::result_message;
return ::static_pointer_cast<result_msg>(make_shared<void_result_msg>());
});
if (guard) {
release_guard(std::move(*guard));
}
co_await state.get_client_state().get_auth_service()->underlying_role_manager().
remove_attribute(_role_name, "service_level");
using void_result_msg = cql_transport::messages::result_message::void_message;
using result_msg = cql_transport::messages::result_message;
co_return ::static_pointer_cast<result_msg>(make_shared<void_result_msg>());
}
}
}

View File

@@ -35,11 +35,8 @@ drop_service_level_statement::execute(query_processor& qp,
service::query_state &state,
const query_options &,
std::optional<service::group0_guard> guard) const {
return state.get_service_level_controller().drop_distributed_service_level(_service_level, _if_exists, std::move(guard)).then([] {
using void_result_msg = cql_transport::messages::result_message::void_message;
using result_msg = cql_transport::messages::result_message;
return ::static_pointer_cast<result_msg>(make_shared<void_result_msg>());
});
co_await state.get_service_level_controller().drop_distributed_service_level(_service_level, _if_exists, std::move(guard));
co_return nullptr;
}
}
}

View File

@@ -20,15 +20,17 @@ std::unique_ptr<cql3::statements::prepared_statement> cql3::statements::grant_st
future<::shared_ptr<cql_transport::messages::result_message>>
cql3::statements::grant_statement::execute(query_processor&, service::query_state& state, const query_options& options, std::optional<service::group0_guard> guard) const {
if (guard) {
release_guard(std::move(*guard));
}
auto& auth_service = *state.get_client_state().get_auth_service();
try {
co_await auth::grant_permissions(auth_service, _role_name, _permissions, _resource);
} catch (const auth::nonexistant_role& e) {
throw exceptions::invalid_request_exception(e.what());
} catch (const auth::unsupported_authorization_operation& e) {
throw exceptions::invalid_request_exception(e.what());
}
return auth::grant_permissions(auth_service, _role_name, _permissions, _resource).then([] {
return make_ready_future<::shared_ptr<cql_transport::messages::result_message>>();
}).handle_exception_type([](const auth::nonexistant_role& e) {
return make_exception_future<::shared_ptr<cql_transport::messages::result_message>>(
exceptions::invalid_request_exception(e.what()));
}).handle_exception_type([](const auth::unsupported_authorization_operation& e) {
return make_exception_future<::shared_ptr<cql_transport::messages::result_message>>(
exceptions::invalid_request_exception(e.what()));
});
co_return ::shared_ptr<cql_transport::messages::result_message>();
}

View File

@@ -20,15 +20,18 @@ std::unique_ptr<cql3::statements::prepared_statement> cql3::statements::revoke_s
future<::shared_ptr<cql_transport::messages::result_message>>
cql3::statements::revoke_statement::execute(query_processor& qp, service::query_state& state, const query_options& options, std::optional<service::group0_guard> guard) const {
if (guard) {
release_guard(std::move(*guard));
}
auto& auth_service = *state.get_client_state().get_auth_service();
return auth::revoke_permissions(auth_service, _role_name, _permissions, _resource).then([] {
return make_ready_future<::shared_ptr<cql_transport::messages::result_message>>();
}).handle_exception_type([](const auth::nonexistant_role& e) {
return make_exception_future<::shared_ptr<cql_transport::messages::result_message>>(
exceptions::invalid_request_exception(e.what()));
}).handle_exception_type([](const auth::unsupported_authorization_operation& e) {
return make_exception_future<::shared_ptr<cql_transport::messages::result_message>>(
exceptions::invalid_request_exception(e.what()));
});
try {
co_await auth::revoke_permissions(auth_service, _role_name, _permissions, _resource);
} catch (const auth::nonexistant_role& e) {
throw exceptions::invalid_request_exception(e.what());
} catch (const auth::unsupported_authorization_operation& e) {
throw exceptions::invalid_request_exception(e.what());
}
co_return ::shared_ptr<cql_transport::messages::result_message>();
}

View File

@@ -27,6 +27,7 @@
#include "exceptions/exceptions.hh"
#include "service/storage_proxy.hh"
#include "transport/messages/result_message.hh"
#include "service/raft/raft_group0_client.hh"
namespace cql3 {
@@ -46,10 +47,6 @@ static auth::authentication_options extract_authentication_options(const cql3::r
return authen_options;
}
static future<result_message_ptr> void_result_message() {
return make_ready_future<result_message_ptr>(nullptr);
}
//
// `create_role_statement`
//
@@ -60,14 +57,15 @@ std::unique_ptr<prepared_statement> create_role_statement::prepare(
}
future<> create_role_statement::grant_permissions_to_creator(const service::client_state& cs) const {
return do_with(auth::make_role_resource(_role), [&cs](const auth::resource& r) {
return auth::grant_applicable_permissions(
auto resource = auth::make_role_resource(_role);
try {
co_await auth::grant_applicable_permissions(
*cs.get_auth_service(),
*cs.user(),
r).handle_exception_type([](const auth::unsupported_authorization_operation&) {
// Nothing.
});
});
resource);
} catch (const auth::unsupported_authorization_operation&) {
// Nothing.
}
}
future<> create_role_statement::check_access(query_processor& qp, const service::client_state& state) const {
@@ -89,31 +87,28 @@ create_role_statement::execute(query_processor&,
service::query_state& state,
const query_options&,
std::optional<service::group0_guard> guard) const {
if (guard) {
release_guard(std::move(*guard));
}
auth::role_config config;
config.is_superuser = *_options.is_superuser;
config.can_login = *_options.can_login;
return do_with(
std::move(config),
extract_authentication_options(_options),
[this, &state](const auth::role_config& config, const auth::authentication_options& authen_options) {
const auto& cs = state.get_client_state();
auto& as = *cs.get_auth_service();
const auto& cs = state.get_client_state();
auto& as = *cs.get_auth_service();
return auth::create_role(as, _role, config, authen_options).then([this, &cs] {
return grant_permissions_to_creator(cs);
}).then([] {
return void_result_message();
}).handle_exception_type([this](const auth::role_already_exists& e) {
if (!_if_not_exists) {
return make_exception_future<result_message_ptr>(exceptions::invalid_request_exception(e.what()));
}
try {
co_await auth::create_role(as, _role, config, extract_authentication_options(_options));
co_await grant_permissions_to_creator(cs);
} catch (const auth::role_already_exists& e) {
if (!_if_not_exists) {
throw exceptions::invalid_request_exception(e.what());
}
} catch (const auth::unsupported_authentication_option& e) {
throw exceptions::invalid_request_exception(e.what());
}
return void_result_message();
}).handle_exception_type([](const auth::unsupported_authentication_option& e) {
return make_exception_future<result_message_ptr>(exceptions::invalid_request_exception(e.what()));
});
});
co_return nullptr;
}
//
@@ -173,24 +168,23 @@ future<> alter_role_statement::check_access(query_processor& qp, const service::
future<result_message_ptr>
alter_role_statement::execute(query_processor&, service::query_state& state, const query_options&, std::optional<service::group0_guard> guard) const {
if (guard) {
release_guard(std::move(*guard));
}
auth::role_config_update update;
update.is_superuser = _options.is_superuser;
update.can_login = _options.can_login;
return do_with(
std::move(update),
extract_authentication_options(_options),
[this, &state](const auth::role_config_update& update, const auth::authentication_options& authen_options) {
auto& as = *state.get_client_state().get_auth_service();
auto& as = *state.get_client_state().get_auth_service();
try {
co_await auth::alter_role(as, _role, update, extract_authentication_options(_options));
} catch (const auth::nonexistant_role& e) {
throw exceptions::invalid_request_exception(e.what());
} catch (const auth::unsupported_authentication_option& e) {
throw exceptions::invalid_request_exception(e.what());
}
return auth::alter_role(as, _role, update, authen_options).then([] {
return void_result_message();
}).handle_exception_type([](const auth::nonexistant_role& e) {
return make_exception_future<result_message_ptr>(exceptions::invalid_request_exception(e.what()));
}).handle_exception_type([](const auth::unsupported_authentication_option& e) {
return make_exception_future<result_message_ptr>(exceptions::invalid_request_exception(e.what()));
});
});
co_return nullptr;
}
//
@@ -235,17 +229,19 @@ future<> drop_role_statement::check_access(query_processor& qp, const service::c
future<result_message_ptr>
drop_role_statement::execute(query_processor&, service::query_state& state, const query_options&, std::optional<service::group0_guard> guard) const {
if (guard) {
release_guard(std::move(*guard));
}
auto& as = *state.get_client_state().get_auth_service();
return auth::drop_role(as, _role).then([] {
return void_result_message();
}).handle_exception_type([this](const auth::nonexistant_role& e) {
if (!_if_exists) {
return make_exception_future<result_message_ptr>(exceptions::invalid_request_exception(e.what()));
try {
co_await auth::drop_role(as, _role);
} catch (const auth::nonexistant_role& e) {
if (!_if_exists) {
throw exceptions::invalid_request_exception(e.what());
}
}
return void_result_message();
});
co_return nullptr;
}
//
@@ -402,13 +398,17 @@ future<> grant_role_statement::check_access(query_processor& qp, const service::
future<result_message_ptr>
grant_role_statement::execute(query_processor&, service::query_state& state, const query_options&, std::optional<service::group0_guard> guard) const {
if (guard) {
release_guard(std::move(*guard));
}
auto& as = *state.get_client_state().get_auth_service();
try {
co_await as.underlying_role_manager().grant(_grantee, _role);
} catch (const auth::roles_argument_exception& e) {
throw exceptions::invalid_request_exception(e.what());
}
return as.underlying_role_manager().grant(_grantee, _role).then([] {
return void_result_message();
}).handle_exception_type([](const auth::roles_argument_exception& e) {
return make_exception_future<result_message_ptr>(exceptions::invalid_request_exception(e.what()));
});
co_return nullptr;
}
//
@@ -433,13 +433,16 @@ future<result_message_ptr> revoke_role_statement::execute(
service::query_state& state,
const query_options&,
std::optional<service::group0_guard> guard) const {
if (guard) {
release_guard(std::move(*guard));
}
auto& rm = state.get_client_state().get_auth_service()->underlying_role_manager();
return rm.revoke(_revokee, _role).then([] {
return void_result_message();
}).handle_exception_type([](const auth::roles_argument_exception& e) {
return make_exception_future<result_message_ptr>(exceptions::invalid_request_exception(e.what()));
});
try {
co_await rm.revoke(_revokee, _role);
} catch (const auth::roles_argument_exception& e) {
throw exceptions::invalid_request_exception(e.what());
}
co_return nullptr;
}
}

View File

@@ -70,18 +70,14 @@ schema_altering_statement::execute(query_processor& qp, service::query_state& st
throw std::logic_error(format("Attempted to modify {} via internal query: such schema changes are not propagated and thus illegal", info));
}
}
return qp.execute_schema_statement(*this, state, options, std::move(guard)).then([this, &state, internal](::shared_ptr<messages::result_message> result) {
// We don't want to grant the permissions to the supposed creator even if the statement succeeded if it's an internal query
// or if the query did not actually create the item, i.e. the query is bounced to another shard or it's a IF NOT EXISTS
// query where the item already exists.
auto permissions_granted_fut = internal || !result->is_schema_change()
? make_ready_future<>()
: grant_permissions_to_creator(state.get_client_state());
return permissions_granted_fut.then([result = std::move(result)] {
return result;
});
});
auto result = co_await qp.execute_schema_statement(*this, state, options, std::move(guard));
// We don't want to grant the permissions to the supposed creator even if the statement succeeded if it's an internal query
// or if the query did not actually create the item, i.e. the query is bounced to another shard or it's a IF NOT EXISTS
// query where the item already exists.
if (!internal && result->is_schema_change()) {
co_await grant_permissions_to_creator(state.get_client_state());
}
co_return std::move(result);
}
}