From 51186b2f2c66d5107e57d0dd54caed46a82ceda8 Mon Sep 17 00:00:00 2001 From: Nadav Har'El Date: Tue, 12 Aug 2025 11:38:53 +0300 Subject: [PATCH] alternator: add alternator_warn_authorization config Before this patch, the configuration alternator_enforce_authorization is a boolean: true means enforce authentication checks (i.e., each request is signed by a valid user) and authorization checks (the user who signed the request is allowed by RBAC to perform this request). This patch adds a second boolean configuration option, alternator_warn_authorization. When alternator_enforce_authorization is false but alternator_warn_authorization is true, authentication and authorization checks are performed as in enforce mode, but failures are ignored and counted in two new metrics: scylla_alternator_authentication_failures scylla_alternator_authorization_failures additionally,also each authentication or authorization error is logged as a WARN-level log message. Some users prefer those log messages over metrics, as the log messages contain additional information about the failure that can be useful - such as the address of the misconfigured client, or the username attempted in the request. All combinations of the two configuration options are allowed: * If just "enforce" is true, auth failures cause a request failure. The failures are counted, but not logged. * If both "enforce" and "warn" are true, auth failures cause a request failure. The failures are both counted and logged. * If just "warn" is true, auth failures are ignored (the request is allowed to compelete) but are counted and logged. * If neither "enforce" nor "warn" are true, no authentication or authorization check are done at all. So we don't know about failures, so naturally we don't count them and don't log them. This patch is fairly straightforward, doing mainly the following things: 1. Add an alternator_warn_authorization config parameter. 2. Make sure alternator_enforce_authorization is live-updatable (we'll use this in a test in the next patch). It "almost" was, but a typo prevented the live update from working properly. 3. Add the two new metrics, and increment them in every type of authentication or authorization error. Some code that needs to increment these new metrics didn't have access to the "stats" object, so we had to pass it around more. 4. Add log messages when alternator_warn_authorization is true. 5. If alternator_enforce_authorization is false, allow the auth check to allow the request to proceed (after having counted and/or logged the auth error). A separate patch will follow and add documentation suggesting to users how to use the new "warn" options to safely switch between non-enforcing to enforcing mode. Another patch will add tests for the new configuration options, new metrics and new log messages. Fixes #25308. Signed-off-by: Nadav Har'El --- alternator/controller.cc | 1 + alternator/executor.cc | 92 ++++++++++++++++++++++++++-------------- alternator/executor.hh | 3 +- alternator/server.cc | 72 ++++++++++++++++++++++++++----- alternator/server.hh | 3 +- alternator/stats.cc | 10 +++++ alternator/stats.hh | 11 +++++ alternator/streams.cc | 2 +- alternator/ttl.cc | 2 +- db/config.cc | 3 +- db/config.hh | 1 + 11 files changed, 151 insertions(+), 49 deletions(-) diff --git a/alternator/controller.cc b/alternator/controller.cc index 0cb889a2d2..64e6114f75 100644 --- a/alternator/controller.cc +++ b/alternator/controller.cc @@ -136,6 +136,7 @@ future<> controller::start_server() { [this, addr, alternator_port, alternator_https_port, creds = std::move(creds)] (server& server) mutable { return server.init(addr, alternator_port, alternator_https_port, creds, _config.alternator_enforce_authorization, + _config.alternator_warn_authorization, _config.alternator_max_users_query_size_in_trace_output, &_memory_limiter.local().get_semaphore(), _config.max_concurrent_requests_per_shard); diff --git a/alternator/executor.cc b/alternator/executor.cc index 28ac9a734b..8979d7332e 100644 --- a/alternator/executor.cc +++ b/alternator/executor.cc @@ -244,7 +244,8 @@ executor::executor(gms::gossiper& gossiper, _mm(mm), _sdks(sdks), _cdc_metadata(cdc_metadata), - _enforce_authorization(_proxy.data_dictionary().get_config().alternator_enforce_authorization()), + _enforce_authorization(_proxy.data_dictionary().get_config().alternator_enforce_authorization), + _warn_authorization(_proxy.data_dictionary().get_config().alternator_warn_authorization), _ssg(ssg), _parsed_expression_cache(std::make_unique( parsed::expression_cache::config{_proxy.data_dictionary().get_config().alternator_max_expression_cache_entries_per_shard}, @@ -880,15 +881,37 @@ future executor::describe_table(client_state& cli co_return rjson::print(std::move(response)); } +// This function increments the authorization_failures counter, and may also +// log a warn-level message and/or throw an access_denied exception, depending +// on what enforce_authorization and warn_authorization are set to. +// Note that if enforce_authorization is false, this function will return +// without throwing. So a caller that doesn't want to continue after an +// authorization_error must explicitly return after calling this function. +static void authorization_error(alternator::stats& stats, bool enforce_authorization, bool warn_authorization, std::string msg) { + stats.authorization_failures++; + if (enforce_authorization) { + if (warn_authorization) { + elogger.warn("alternator_warn_authorization=true: {}", msg); + } + throw api_error::access_denied(std::move(msg)); + } else { + if (warn_authorization) { + elogger.warn("If you set alternator_enforce_authorization=true the following will be enforced: {}", msg); + } + } +} + // Check CQL's Role-Based Access Control (RBAC) permission_to_check (MODIFY, // SELECT, DROP, etc.) on the given table. When permission is denied an // appropriate user-readable api_error::access_denied is thrown. future<> verify_permission( bool enforce_authorization, + bool warn_authorization, const service::client_state& client_state, const schema_ptr& schema, - auth::permission permission_to_check) { - if (!enforce_authorization) { + auth::permission permission_to_check, + alternator::stats& stats) { + if (!enforce_authorization && !warn_authorization) { co_return; } // Unfortunately, the fix for issue #23218 did not modify the function @@ -903,31 +926,33 @@ future<> verify_permission( if (client_state.user() && client_state.user()->name) { username = client_state.user()->name.value(); } - throw api_error::access_denied(fmt::format( + authorization_error(stats, enforce_authorization, warn_authorization, fmt::format( "Write access denied on internal table {}.{} to role {} because it is not a superuser", schema->ks_name(), schema->cf_name(), username)); + co_return; } } auto resource = auth::make_data_resource(schema->ks_name(), schema->cf_name()); - if (!co_await client_state.check_has_permission(auth::command_desc(permission_to_check, resource))) { + if (!client_state.user() || !client_state.user()->name || + !co_await client_state.check_has_permission(auth::command_desc(permission_to_check, resource))) { sstring username = ""; if (client_state.user() && client_state.user()->name) { username = client_state.user()->name.value(); } // Using exceptions for errors makes this function faster in the // success path (when the operation is allowed). - throw api_error::access_denied(format( - "{} access on table {}.{} is denied to role {}", + authorization_error(stats, enforce_authorization, warn_authorization, fmt::format( + "{} access on table {}.{} is denied to role {}, client address {}", auth::permissions::to_string(permission_to_check), - schema->ks_name(), schema->cf_name(), username)); + schema->ks_name(), schema->cf_name(), username, client_state.get_client_address())); } } // Similar to verify_permission() above, but just for CREATE operations. // Those do not operate on any specific table, so require permissions on // ALL KEYSPACES instead of any specific table. -future<> verify_create_permission(bool enforce_authorization, const service::client_state& client_state) { - if (!enforce_authorization) { +static future<> verify_create_permission(bool enforce_authorization, bool warn_authorization, const service::client_state& client_state, alternator::stats& stats) { + if (!enforce_authorization && !warn_authorization) { co_return; } auto resource = auth::resource(auth::resource_kind::data); @@ -936,7 +961,7 @@ future<> verify_create_permission(bool enforce_authorization, const service::cli if (client_state.user() && client_state.user()->name) { username = client_state.user()->name.value(); } - throw api_error::access_denied(format( + authorization_error(stats, enforce_authorization, warn_authorization, fmt::format( "CREATE access on ALL KEYSPACES is denied to role {}", username)); } } @@ -953,7 +978,7 @@ future executor::delete_table(client_state& clien schema_ptr schema = get_table(_proxy, request); rjson::value table_description = co_await fill_table_description(schema, table_status::deleting, _proxy, client_state, trace_state, permit); - co_await verify_permission(_enforce_authorization, client_state, schema, auth::permission::DROP); + co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, schema, auth::permission::DROP, _stats); co_await _mm.container().invoke_on(0, [&, cs = client_state.move_to_other_shard()] (service::migration_manager& mm) -> future<> { size_t retries = mm.get_concurrent_ddl_retries(); for (;;) { @@ -1291,7 +1316,7 @@ future executor::tag_resource(client_state& clien if (tags->Size() < 1) { co_return api_error::validation("The number of tags must be at least 1") ; } - co_await verify_permission(_enforce_authorization, client_state, schema, auth::permission::ALTER); + co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, schema, auth::permission::ALTER, _stats); co_await db::modify_tags(_mm, schema->ks_name(), schema->cf_name(), [tags](std::map& tags_map) { update_tags_map(*tags, tags_map, update_tags_action::add_tags); }); @@ -1312,7 +1337,7 @@ future executor::untag_resource(client_state& cli schema_ptr schema = get_table_from_arn(_proxy, rjson::to_string_view(*arn)); get_stats_from_schema(_proxy, *schema)->api_operations.untag_resource++; - co_await verify_permission(_enforce_authorization, client_state, schema, auth::permission::ALTER); + co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, schema, auth::permission::ALTER, _stats); co_await db::modify_tags(_mm, schema->ks_name(), schema->cf_name(), [tags](std::map& tags_map) { update_tags_map(*tags, tags_map, update_tags_action::delete_tags); }); @@ -1515,7 +1540,7 @@ static future<> mark_view_schemas_as_built(utils::chunked_vector& out, } } -static future create_table_on_shard0(service::client_state&& client_state, tracing::trace_state_ptr trace_state, rjson::value request, service::storage_proxy& sp, service::migration_manager& mm, gms::gossiper& gossiper, bool enforce_authorization) { +static future create_table_on_shard0(service::client_state&& client_state, tracing::trace_state_ptr trace_state, rjson::value request, service::storage_proxy& sp, service::migration_manager& mm, gms::gossiper& gossiper, bool enforce_authorization, bool warn_authorization, stats& stats) { SCYLLA_ASSERT(this_shard_id() == 0); // We begin by parsing and validating the content of the CreateTable @@ -1721,7 +1746,7 @@ static future create_table_on_shard0(service::cli set_table_creation_time(tags_map, db_clock::now()); builder.add_extension(db::tags_extension::NAME, ::make_shared(tags_map)); - co_await verify_create_permission(enforce_authorization, client_state); + co_await verify_create_permission(enforce_authorization, warn_authorization, client_state, stats); schema_ptr schema = builder.build(); for (auto& view_builder : view_builders) { @@ -1822,9 +1847,9 @@ future executor::create_table(client_state& clien _stats.api_operations.create_table++; elogger.trace("Creating table {}", request); - co_return co_await _mm.container().invoke_on(0, [&, tr = tracing::global_trace_state_ptr(trace_state), request = std::move(request), &sp = _proxy.container(), &g = _gossiper.container(), client_state_other_shard = client_state.move_to_other_shard(), enforce_authorization = bool(_enforce_authorization)] + co_return co_await _mm.container().invoke_on(0, [&, tr = tracing::global_trace_state_ptr(trace_state), request = std::move(request), &sp = _proxy.container(), &g = _gossiper.container(), client_state_other_shard = client_state.move_to_other_shard(), enforce_authorization = bool(_enforce_authorization), warn_authorization = bool(_warn_authorization)] (service::migration_manager& mm) mutable -> future { - co_return co_await create_table_on_shard0(client_state_other_shard.get(), tr, std::move(request), sp.local(), mm, g.local(), enforce_authorization); + co_return co_await create_table_on_shard0(client_state_other_shard.get(), tr, std::move(request), sp.local(), mm, g.local(), enforce_authorization, warn_authorization, _stats); }); } @@ -1877,7 +1902,7 @@ future executor::update_table(client_state& clien verify_billing_mode(request); } - co_return co_await _mm.container().invoke_on(0, [&p = _proxy.container(), request = std::move(request), gt = tracing::global_trace_state_ptr(std::move(trace_state)), enforce_authorization = bool(_enforce_authorization), client_state_other_shard = client_state.move_to_other_shard(), empty_request] + co_return co_await _mm.container().invoke_on(0, [&p = _proxy.container(), request = std::move(request), gt = tracing::global_trace_state_ptr(std::move(trace_state)), enforce_authorization = bool(_enforce_authorization), warn_authorization = bool(_warn_authorization), client_state_other_shard = client_state.move_to_other_shard(), empty_request, &e = this->container()] (service::migration_manager& mm) mutable -> future { schema_ptr schema; size_t retries = mm.get_concurrent_ddl_retries(); @@ -2048,7 +2073,7 @@ future executor::update_table(client_state& clien co_return api_error::validation("UpdateTable requires one of GlobalSecondaryIndexUpdates, StreamSpecification or BillingMode to be specified"); } - co_await verify_permission(enforce_authorization, client_state_other_shard.get(), schema, auth::permission::ALTER); + co_await verify_permission(enforce_authorization, warn_authorization, client_state_other_shard.get(), schema, auth::permission::ALTER, e.local()._stats); auto m = co_await service::prepare_column_family_update_announcement(p.local(), schema, std::vector(), group0_guard.write_timestamp()); for (view_ptr view : new_views) { auto m2 = co_await service::prepare_new_view_announcement(p.local(), view, group0_guard.write_timestamp()); @@ -2816,7 +2841,7 @@ future executor::put_item(client_state& client_st tracing::add_table_name(trace_state, op->schema()->ks_name(), op->schema()->cf_name()); const bool needs_read_before_write = op->needs_read_before_write(); - co_await verify_permission(_enforce_authorization, client_state, op->schema(), auth::permission::MODIFY); + co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, op->schema(), auth::permission::MODIFY, _stats); auto cas_shard = op->shard_for_execute(needs_read_before_write); @@ -2920,7 +2945,7 @@ future executor::delete_item(client_state& client tracing::add_table_name(trace_state, op->schema()->ks_name(), op->schema()->cf_name()); const bool needs_read_before_write = _proxy.data_dictionary().get_config().alternator_force_read_before_write() || op->needs_read_before_write(); - co_await verify_permission(_enforce_authorization, client_state, op->schema(), auth::permission::MODIFY); + co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, op->schema(), auth::permission::MODIFY, _stats); auto cas_shard = op->shard_for_execute(needs_read_before_write); @@ -3198,7 +3223,7 @@ future executor::batch_write_item(client_state& c per_table_wcu.emplace_back(std::make_pair(per_table_stats, schema)); } for (const auto& b : mutation_builders) { - co_await verify_permission(_enforce_authorization, client_state, b.first, auth::permission::MODIFY); + co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, b.first, auth::permission::MODIFY, _stats); } // If alternator_force_read_before_write is true we will first get the previous item size // and only then do send the mutation. @@ -4424,7 +4449,7 @@ future executor::update_item(client_state& client tracing::add_table_name(trace_state, op->schema()->ks_name(), op->schema()->cf_name()); const bool needs_read_before_write = _proxy.data_dictionary().get_config().alternator_force_read_before_write() || op->needs_read_before_write(); - co_await verify_permission(_enforce_authorization, client_state, op->schema(), auth::permission::MODIFY); + co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, op->schema(), auth::permission::MODIFY, _stats); auto cas_shard = op->shard_for_execute(needs_read_before_write); @@ -4535,7 +4560,7 @@ future executor::get_item(client_state& client_st const rjson::value* expression_attribute_names = rjson::find(request, "ExpressionAttributeNames"); verify_all_are_used(expression_attribute_names, used_attribute_names, "ExpressionAttributeNames", "GetItem"); rcu_consumed_capacity_counter add_capacity(request, cl == db::consistency_level::LOCAL_QUORUM); - co_await verify_permission(_enforce_authorization, client_state, schema, auth::permission::SELECT); + co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, schema, auth::permission::SELECT, _stats); service::storage_proxy::coordinator_query_result qr = co_await _proxy.query( schema, std::move(command), std::move(partition_ranges), cl, @@ -4667,7 +4692,7 @@ future executor::batch_get_item(client_state& cli } for (const table_requests& tr : requests) { - co_await verify_permission(_enforce_authorization, client_state, tr.schema, auth::permission::SELECT); + co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, tr.schema, auth::permission::SELECT, _stats); } _stats.api_operations.batch_get_item_batch_total += batch_size; @@ -5127,10 +5152,11 @@ static future do_query(service::storage_proxy& pr filter filter, query::partition_slice::option_set custom_opts, service::client_state& client_state, - cql3::cql_stats& cql_stats, + alternator::stats& stats, tracing::trace_state_ptr trace_state, service_permit permit, - bool enforce_authorization) { + bool enforce_authorization, + bool warn_authorization) { lw_shared_ptr old_paging_state = nullptr; tracing::trace(trace_state, "Performing a database query"); @@ -5157,7 +5183,7 @@ static future do_query(service::storage_proxy& pr old_paging_state = make_lw_shared(pk, pos, query::max_partitions, query_id::create_null_id(), service::pager::paging_state::replicas_per_token_range{}, std::nullopt, 0); } - co_await verify_permission(enforce_authorization, client_state, table_schema, auth::permission::SELECT); + co_await verify_permission(enforce_authorization, warn_authorization, client_state, table_schema, auth::permission::SELECT, stats); auto regular_columns = table_schema->regular_columns() | std::views::transform(&column_definition::id) @@ -5193,9 +5219,9 @@ static future do_query(service::storage_proxy& pr rjson::add(items_descr, "LastEvaluatedKey", encode_paging_state(*table_schema, *paging_state)); } if (has_filter) { - cql_stats.filtered_rows_read_total += p->stats().rows_read_total; + stats.cql_stats.filtered_rows_read_total += p->stats().rows_read_total; // update our "filtered_row_matched_total" for all the rows matched, despited the filter - cql_stats.filtered_rows_matched_total += size; + stats.cql_stats.filtered_rows_matched_total += size; } if (opt_items) { if (opt_items->size() >= max_items_for_rapidjson_array) { @@ -5319,7 +5345,7 @@ future executor::scan(client_state& client_state, verify_all_are_used(expression_attribute_values, used_attribute_values, "ExpressionAttributeValues", "Scan"); return do_query(_proxy, schema, exclusive_start_key, std::move(partition_ranges), std::move(ck_bounds), std::move(attrs_to_get), limit, cl, - std::move(filter), query::partition_slice::option_set(), client_state, _stats.cql_stats, trace_state, std::move(permit), _enforce_authorization); + std::move(filter), query::partition_slice::option_set(), client_state, _stats, trace_state, std::move(permit), _enforce_authorization, _warn_authorization); } static dht::partition_range calculate_pk_bound(schema_ptr schema, const column_definition& pk_cdef, const rjson::value& comp_definition, const rjson::value& attrs) { @@ -5800,7 +5826,7 @@ future executor::query(client_state& client_state query::partition_slice::option_set opts; opts.set_if(!forward); return do_query(_proxy, schema, exclusive_start_key, std::move(partition_ranges), std::move(ck_bounds), std::move(attrs_to_get), limit, cl, - std::move(filter), opts, client_state, _stats.cql_stats, std::move(trace_state), std::move(permit), _enforce_authorization); + std::move(filter), opts, client_state, _stats, std::move(trace_state), std::move(permit), _enforce_authorization, _warn_authorization); } future executor::list_tables(client_state& client_state, service_permit permit, rjson::value request) { diff --git a/alternator/executor.hh b/alternator/executor.hh index de97c933fa..5a35c0fe40 100644 --- a/alternator/executor.hh +++ b/alternator/executor.hh @@ -139,6 +139,7 @@ class executor : public peering_sharded_service { db::system_distributed_keyspace& _sdks; cdc::metadata& _cdc_metadata; utils::updateable_value _enforce_authorization; + utils::updateable_value _warn_authorization; // An smp_service_group to be used for limiting the concurrency when // forwarding Alternator request between shards - if necessary for LWT. smp_service_group _ssg; @@ -264,7 +265,7 @@ bool is_big(const rjson::value& val, int big_size = 100'000); // Check CQL's Role-Based Access Control (RBAC) permission (MODIFY, // SELECT, DROP, etc.) on the given table. When permission is denied an // appropriate user-readable api_error::access_denied is thrown. -future<> verify_permission(bool enforce_authorization, const service::client_state&, const schema_ptr&, auth::permission); +future<> verify_permission(bool enforce_authorization, bool warn_authorization, const service::client_state&, const schema_ptr&, auth::permission, alternator::stats& stats); /** * Make return type for serializing the object "streamed", diff --git a/alternator/server.cc b/alternator/server.cc index 4a4ae64b75..3f07aa2127 100644 --- a/alternator/server.cc +++ b/alternator/server.cc @@ -31,6 +31,7 @@ #include "utils/overloaded_functor.hh" #include "utils/aws_sigv4.hh" #include "client_data.hh" +#include "utils/updateable_value.hh" static logging::logger slogger("alternator-server"); @@ -270,24 +271,57 @@ protected: } }; +// This function increments the authentication_failures counter, and may also +// log a warn-level message and/or throw an exception, depending on what +// enforce_authorization and warn_authorization are set to. +// The username and client address are only used for logging purposes - +// they are not included in the error message returned to the client, since +// the client knows who it is. +// Note that if enforce_authorization is false, this function will return +// without throwing. So a caller that doesn't want to continue after an +// authentication_error must explicitly return after calling this function. +template +static void authentication_error(alternator::stats& stats, bool enforce_authorization, bool warn_authorization, Exception&& e, std::string_view user, gms::inet_address client_address) { + stats.authentication_failures++; + if (enforce_authorization) { + if (warn_authorization) { + slogger.warn("alternator_warn_authorization=true: {} for user {}, client address {}", e.what(), user, client_address); + } + throw std::move(e); + } else { + if (warn_authorization) { + slogger.warn("If you set alternator_enforce_authorization=true the following will be enforced: {} for user {}, client address {}", e.what(), user, client_address); + } + } +} + future server::verify_signature(const request& req, const chunked_content& content) { - if (!_enforce_authorization) { + if (!_enforce_authorization.get() && !_warn_authorization.get()) { slogger.debug("Skipping authorization"); return make_ready_future(); } auto host_it = req._headers.find("Host"); if (host_it == req._headers.end()) { - throw api_error::invalid_signature("Host header is mandatory for signature verification"); + authentication_error(_executor._stats, _enforce_authorization.get(), _warn_authorization.get(), + api_error::invalid_signature("Host header is mandatory for signature verification"), + "", req.get_client_address()); + return make_ready_future(); } auto authorization_it = req._headers.find("Authorization"); if (authorization_it == req._headers.end()) { - throw api_error::missing_authentication_token("Authorization header is mandatory for signature verification"); + authentication_error(_executor._stats, _enforce_authorization.get(), _warn_authorization.get(), + api_error::missing_authentication_token("Authorization header is mandatory for signature verification"), + "", req.get_client_address()); + return make_ready_future(); } std::string host = host_it->second; std::string_view authorization_header = authorization_it->second; auto pos = authorization_header.find_first_of(' '); if (pos == std::string_view::npos || authorization_header.substr(0, pos) != "AWS4-HMAC-SHA256") { - throw api_error::invalid_signature(fmt::format("Authorization header must use AWS4-HMAC-SHA256 algorithm: {}", authorization_header)); + authentication_error(_executor._stats, _enforce_authorization.get(), _warn_authorization.get(), + api_error::invalid_signature(fmt::format("Authorization header must use AWS4-HMAC-SHA256 algorithm: {}", authorization_header)), + "", req.get_client_address()); + return make_ready_future(); } authorization_header.remove_prefix(pos+1); std::string credential; @@ -322,7 +356,9 @@ future server::verify_signature(const request& req, const chunked_c std::vector credential_split = split(credential, '/'); if (credential_split.size() != 5) { - throw api_error::validation(fmt::format("Incorrect credential information format: {}", credential)); + authentication_error(_executor._stats, _enforce_authorization.get(), _warn_authorization.get(), + api_error::validation(fmt::format("Incorrect credential information format: {}", credential)), "", req.get_client_address()); + return make_ready_future(); } std::string user(credential_split[0]); std::string datestamp(credential_split[1]); @@ -346,7 +382,7 @@ future server::verify_signature(const request& req, const chunked_c auto cache_getter = [&proxy = _proxy, &as = _auth_service] (std::string username) { return get_key_from_roles(proxy, as, std::move(username)); }; - return _key_cache.get_ptr(user, cache_getter).then([this, &req, &content, + return _key_cache.get_ptr(user, cache_getter).then_wrapped([this, &req, &content, user = std::move(user), host = std::move(host), datestamp = std::move(datestamp), @@ -354,18 +390,32 @@ future server::verify_signature(const request& req, const chunked_c signed_headers_map = std::move(signed_headers_map), region = std::move(region), service = std::move(service), - user_signature = std::move(user_signature)] (key_cache::value_ptr key_ptr) { + user_signature = std::move(user_signature)] (future key_ptr_fut) { + key_cache::value_ptr key_ptr(nullptr); + try { + key_ptr = key_ptr_fut.get(); + } catch (const api_error& e) { + authentication_error(_executor._stats, _enforce_authorization.get(), _warn_authorization.get(), + e, user, req.get_client_address()); + return std::string(); + } std::string signature; try { signature = utils::aws::get_signature(user, *key_ptr, std::string_view(host), "/", req._method, datestamp, signed_headers_str, signed_headers_map, &content, region, service, ""); } catch (const std::exception& e) { - throw api_error::invalid_signature(e.what()); + authentication_error(_executor._stats, _enforce_authorization.get(), _warn_authorization.get(), + api_error::invalid_signature(fmt::format("invalid signature: {}", e.what())), + user, req.get_client_address()); + return std::string(); } if (signature != std::string_view(user_signature)) { _key_cache.remove(user); - throw api_error::unrecognized_client("The security token included in the request is invalid."); + authentication_error(_executor._stats, _enforce_authorization.get(), _warn_authorization.get(), + api_error::unrecognized_client("wrong signature"), + user, req.get_client_address()); + return std::string(); } return user; }); @@ -618,7 +668,6 @@ server::server(executor& exec, service::storage_proxy& proxy, gms::gossiper& gos , _auth_service(auth_service) , _sl_controller(sl_controller) , _key_cache(1024, 1min, slogger) - , _enforce_authorization(false) , _max_users_query_size_in_trace_output(1024) , _enabled_servers{} , _pending_requests("alternator::server::pending_requests") @@ -700,10 +749,11 @@ server::server(executor& exec, service::storage_proxy& proxy, gms::gossiper& gos } future<> server::init(net::inet_address addr, std::optional port, std::optional https_port, std::optional creds, - utils::updateable_value enforce_authorization, utils::updateable_value max_users_query_size_in_trace_output, + utils::updateable_value enforce_authorization, utils::updateable_value warn_authorization, utils::updateable_value max_users_query_size_in_trace_output, semaphore* memory_limiter, utils::updateable_value max_concurrent_requests) { _memory_limiter = memory_limiter; _enforce_authorization = std::move(enforce_authorization); + _warn_authorization = std::move(warn_authorization); _max_concurrent_requests = std::move(max_concurrent_requests); _max_users_query_size_in_trace_output = std::move(max_users_query_size_in_trace_output); if (!port && !https_port) { diff --git a/alternator/server.hh b/alternator/server.hh index d966f24095..60d80fdc72 100644 --- a/alternator/server.hh +++ b/alternator/server.hh @@ -47,6 +47,7 @@ class server : public peering_sharded_service { key_cache _key_cache; utils::updateable_value _enforce_authorization; + utils::updateable_value _warn_authorization; utils::updateable_value _max_users_query_size_in_trace_output; utils::small_vector, 2> _enabled_servers; named_gate _pending_requests; @@ -99,7 +100,7 @@ public: server(executor& executor, service::storage_proxy& proxy, gms::gossiper& gossiper, auth::service& service, qos::service_level_controller& sl_controller); future<> init(net::inet_address addr, std::optional port, std::optional https_port, std::optional creds, - utils::updateable_value enforce_authorization, utils::updateable_value max_users_query_size_in_trace_output, + utils::updateable_value enforce_authorization, utils::updateable_value warn_authorization, utils::updateable_value max_users_query_size_in_trace_output, semaphore* memory_limiter, utils::updateable_value max_concurrent_requests); future<> stop(); // get_client_data() is called (on each shard separately) when the virtual diff --git a/alternator/stats.cc b/alternator/stats.cc index fe0b620e4d..ab590dbc91 100644 --- a/alternator/stats.cc +++ b/alternator/stats.cc @@ -188,6 +188,16 @@ static void register_metrics_with_optional_table(seastar::metrics::metric_groups seastar::metrics::make_total_operations("expression_cache_misses", stats.expression_cache.requests[stats::expression_types::PROJECTION_EXPRESSION].misses, seastar::metrics::description("Counts number of misses of cached expressions"), labels)(expression_label("ProjectionExpression")).aggregate(aggregate_labels).set_skip_when_empty() }); + + // Only register the following metrics for the global metrics, not per-table + if (!has_table) { + metrics.add_group("alternator", { + seastar::metrics::make_counter("authentication_failures", stats.authentication_failures, + seastar::metrics::description("total number of authentication failures"), labels).aggregate({seastar::metrics::shard_label}).set_skip_when_empty(), + seastar::metrics::make_counter("authorization_failures", stats.authorization_failures, + seastar::metrics::description("total number of authorization failures"), labels).aggregate({seastar::metrics::shard_label}).set_skip_when_empty(), + }); + } } void register_metrics(seastar::metrics::metric_groups& metrics, const stats& stats) { diff --git a/alternator/stats.hh b/alternator/stats.hh index ad03221e9f..8f39212b53 100644 --- a/alternator/stats.hh +++ b/alternator/stats.hh @@ -105,6 +105,17 @@ public: // The sizes are the the written items' sizes grouped per table. utils::estimated_histogram batch_write_item_op_size_kb{30}; } operation_sizes; + // Count of authentication and authorization failures, counted if either + // alternator_enforce_authorization or alternator_warn_authorization are + // set to true. If both are false, no authentication or authorization + // checks are performed, so failures are not recognized or counted. + // "authentication" failure means the request was not signed with a valid + // user and key combination. "authorization" failure means the request was + // authenticated to a valid user - but this user did not have permissions + // to perform the operation (considering RBAC settings and the user's + // superuser status). + uint64_t authentication_failures = 0; + uint64_t authorization_failures = 0; // Miscellaneous event counters uint64_t total_operations = 0; uint64_t unsupported_operations = 0; diff --git a/alternator/streams.cc b/alternator/streams.cc index fcdac0cd08..57ae78ccf2 100644 --- a/alternator/streams.cc +++ b/alternator/streams.cc @@ -827,7 +827,7 @@ future executor::get_records(client_state& client tracing::add_table_name(trace_state, schema->ks_name(), schema->cf_name()); - co_await verify_permission(_enforce_authorization, client_state, schema, auth::permission::SELECT); + co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, schema, auth::permission::SELECT, _stats); db::consistency_level cl = db::consistency_level::LOCAL_QUORUM; partition_key pk = iter.shard.id.to_partition_key(*schema); diff --git a/alternator/ttl.cc b/alternator/ttl.cc index 3131e0fa2e..81e83fa225 100644 --- a/alternator/ttl.cc +++ b/alternator/ttl.cc @@ -95,7 +95,7 @@ future executor::update_time_to_live(client_state } sstring attribute_name(v->GetString(), v->GetStringLength()); - co_await verify_permission(_enforce_authorization, client_state, schema, auth::permission::ALTER); + co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, schema, auth::permission::ALTER, _stats); co_await db::modify_tags(_mm, schema->ks_name(), schema->cf_name(), [&](std::map& tags_map) { if (enabled) { if (tags_map.contains(TTL_TAG_KEY)) { diff --git a/db/config.cc b/db/config.cc index c29145785b..cbe733384b 100644 --- a/db/config.cc +++ b/db/config.cc @@ -1425,7 +1425,8 @@ db::config::config(std::shared_ptr exts) , alternator_port(this, "alternator_port", value_status::Used, 0, "Alternator API port.") , alternator_https_port(this, "alternator_https_port", value_status::Used, 0, "Alternator API HTTPS port.") , alternator_address(this, "alternator_address", value_status::Used, "0.0.0.0", "Alternator API listening address.") - , alternator_enforce_authorization(this, "alternator_enforce_authorization", value_status::Used, false, "Enforce checking the authorization header for every request in Alternator.") + , alternator_enforce_authorization(this, "alternator_enforce_authorization", liveness::LiveUpdate, value_status::Used, false, "Enforce checking the authorization header for every request in Alternator.") + , alternator_warn_authorization(this, "alternator_warn_authorization", liveness::LiveUpdate, value_status::Used, false, "Count and log warnings about failed authentication or authorization") , alternator_write_isolation(this, "alternator_write_isolation", value_status::Used, "", "Default write isolation policy for Alternator.") , alternator_streams_time_window_s(this, "alternator_streams_time_window_s", value_status::Used, 10, "CDC query confidence window for alternator streams.") , alternator_timeout_in_ms(this, "alternator_timeout_in_ms", liveness::LiveUpdate, value_status::Used, 10000, diff --git a/db/config.hh b/db/config.hh index 0f477e63a5..a24dd5d945 100644 --- a/db/config.hh +++ b/db/config.hh @@ -458,6 +458,7 @@ public: named_value alternator_https_port; named_value alternator_address; named_value alternator_enforce_authorization; + named_value alternator_warn_authorization; named_value alternator_write_isolation; named_value alternator_streams_time_window_s; named_value alternator_timeout_in_ms;