diff --git a/alternator/controller.cc b/alternator/controller.cc index 0cb889a2d2..64e6114f75 100644 --- a/alternator/controller.cc +++ b/alternator/controller.cc @@ -136,6 +136,7 @@ future<> controller::start_server() { [this, addr, alternator_port, alternator_https_port, creds = std::move(creds)] (server& server) mutable { return server.init(addr, alternator_port, alternator_https_port, creds, _config.alternator_enforce_authorization, + _config.alternator_warn_authorization, _config.alternator_max_users_query_size_in_trace_output, &_memory_limiter.local().get_semaphore(), _config.max_concurrent_requests_per_shard); diff --git a/alternator/executor.cc b/alternator/executor.cc index 28ac9a734b..8979d7332e 100644 --- a/alternator/executor.cc +++ b/alternator/executor.cc @@ -244,7 +244,8 @@ executor::executor(gms::gossiper& gossiper, _mm(mm), _sdks(sdks), _cdc_metadata(cdc_metadata), - _enforce_authorization(_proxy.data_dictionary().get_config().alternator_enforce_authorization()), + _enforce_authorization(_proxy.data_dictionary().get_config().alternator_enforce_authorization), + _warn_authorization(_proxy.data_dictionary().get_config().alternator_warn_authorization), _ssg(ssg), _parsed_expression_cache(std::make_unique( parsed::expression_cache::config{_proxy.data_dictionary().get_config().alternator_max_expression_cache_entries_per_shard}, @@ -880,15 +881,37 @@ future executor::describe_table(client_state& cli co_return rjson::print(std::move(response)); } +// This function increments the authorization_failures counter, and may also +// log a warn-level message and/or throw an access_denied exception, depending +// on what enforce_authorization and warn_authorization are set to. +// Note that if enforce_authorization is false, this function will return +// without throwing. So a caller that doesn't want to continue after an +// authorization_error must explicitly return after calling this function. +static void authorization_error(alternator::stats& stats, bool enforce_authorization, bool warn_authorization, std::string msg) { + stats.authorization_failures++; + if (enforce_authorization) { + if (warn_authorization) { + elogger.warn("alternator_warn_authorization=true: {}", msg); + } + throw api_error::access_denied(std::move(msg)); + } else { + if (warn_authorization) { + elogger.warn("If you set alternator_enforce_authorization=true the following will be enforced: {}", msg); + } + } +} + // Check CQL's Role-Based Access Control (RBAC) permission_to_check (MODIFY, // SELECT, DROP, etc.) on the given table. When permission is denied an // appropriate user-readable api_error::access_denied is thrown. future<> verify_permission( bool enforce_authorization, + bool warn_authorization, const service::client_state& client_state, const schema_ptr& schema, - auth::permission permission_to_check) { - if (!enforce_authorization) { + auth::permission permission_to_check, + alternator::stats& stats) { + if (!enforce_authorization && !warn_authorization) { co_return; } // Unfortunately, the fix for issue #23218 did not modify the function @@ -903,31 +926,33 @@ future<> verify_permission( if (client_state.user() && client_state.user()->name) { username = client_state.user()->name.value(); } - throw api_error::access_denied(fmt::format( + authorization_error(stats, enforce_authorization, warn_authorization, fmt::format( "Write access denied on internal table {}.{} to role {} because it is not a superuser", schema->ks_name(), schema->cf_name(), username)); + co_return; } } auto resource = auth::make_data_resource(schema->ks_name(), schema->cf_name()); - if (!co_await client_state.check_has_permission(auth::command_desc(permission_to_check, resource))) { + if (!client_state.user() || !client_state.user()->name || + !co_await client_state.check_has_permission(auth::command_desc(permission_to_check, resource))) { sstring username = ""; if (client_state.user() && client_state.user()->name) { username = client_state.user()->name.value(); } // Using exceptions for errors makes this function faster in the // success path (when the operation is allowed). - throw api_error::access_denied(format( - "{} access on table {}.{} is denied to role {}", + authorization_error(stats, enforce_authorization, warn_authorization, fmt::format( + "{} access on table {}.{} is denied to role {}, client address {}", auth::permissions::to_string(permission_to_check), - schema->ks_name(), schema->cf_name(), username)); + schema->ks_name(), schema->cf_name(), username, client_state.get_client_address())); } } // Similar to verify_permission() above, but just for CREATE operations. // Those do not operate on any specific table, so require permissions on // ALL KEYSPACES instead of any specific table. -future<> verify_create_permission(bool enforce_authorization, const service::client_state& client_state) { - if (!enforce_authorization) { +static future<> verify_create_permission(bool enforce_authorization, bool warn_authorization, const service::client_state& client_state, alternator::stats& stats) { + if (!enforce_authorization && !warn_authorization) { co_return; } auto resource = auth::resource(auth::resource_kind::data); @@ -936,7 +961,7 @@ future<> verify_create_permission(bool enforce_authorization, const service::cli if (client_state.user() && client_state.user()->name) { username = client_state.user()->name.value(); } - throw api_error::access_denied(format( + authorization_error(stats, enforce_authorization, warn_authorization, fmt::format( "CREATE access on ALL KEYSPACES is denied to role {}", username)); } } @@ -953,7 +978,7 @@ future executor::delete_table(client_state& clien schema_ptr schema = get_table(_proxy, request); rjson::value table_description = co_await fill_table_description(schema, table_status::deleting, _proxy, client_state, trace_state, permit); - co_await verify_permission(_enforce_authorization, client_state, schema, auth::permission::DROP); + co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, schema, auth::permission::DROP, _stats); co_await _mm.container().invoke_on(0, [&, cs = client_state.move_to_other_shard()] (service::migration_manager& mm) -> future<> { size_t retries = mm.get_concurrent_ddl_retries(); for (;;) { @@ -1291,7 +1316,7 @@ future executor::tag_resource(client_state& clien if (tags->Size() < 1) { co_return api_error::validation("The number of tags must be at least 1") ; } - co_await verify_permission(_enforce_authorization, client_state, schema, auth::permission::ALTER); + co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, schema, auth::permission::ALTER, _stats); co_await db::modify_tags(_mm, schema->ks_name(), schema->cf_name(), [tags](std::map& tags_map) { update_tags_map(*tags, tags_map, update_tags_action::add_tags); }); @@ -1312,7 +1337,7 @@ future executor::untag_resource(client_state& cli schema_ptr schema = get_table_from_arn(_proxy, rjson::to_string_view(*arn)); get_stats_from_schema(_proxy, *schema)->api_operations.untag_resource++; - co_await verify_permission(_enforce_authorization, client_state, schema, auth::permission::ALTER); + co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, schema, auth::permission::ALTER, _stats); co_await db::modify_tags(_mm, schema->ks_name(), schema->cf_name(), [tags](std::map& tags_map) { update_tags_map(*tags, tags_map, update_tags_action::delete_tags); }); @@ -1515,7 +1540,7 @@ static future<> mark_view_schemas_as_built(utils::chunked_vector& out, } } -static future create_table_on_shard0(service::client_state&& client_state, tracing::trace_state_ptr trace_state, rjson::value request, service::storage_proxy& sp, service::migration_manager& mm, gms::gossiper& gossiper, bool enforce_authorization) { +static future create_table_on_shard0(service::client_state&& client_state, tracing::trace_state_ptr trace_state, rjson::value request, service::storage_proxy& sp, service::migration_manager& mm, gms::gossiper& gossiper, bool enforce_authorization, bool warn_authorization, stats& stats) { SCYLLA_ASSERT(this_shard_id() == 0); // We begin by parsing and validating the content of the CreateTable @@ -1721,7 +1746,7 @@ static future create_table_on_shard0(service::cli set_table_creation_time(tags_map, db_clock::now()); builder.add_extension(db::tags_extension::NAME, ::make_shared(tags_map)); - co_await verify_create_permission(enforce_authorization, client_state); + co_await verify_create_permission(enforce_authorization, warn_authorization, client_state, stats); schema_ptr schema = builder.build(); for (auto& view_builder : view_builders) { @@ -1822,9 +1847,9 @@ future executor::create_table(client_state& clien _stats.api_operations.create_table++; elogger.trace("Creating table {}", request); - co_return co_await _mm.container().invoke_on(0, [&, tr = tracing::global_trace_state_ptr(trace_state), request = std::move(request), &sp = _proxy.container(), &g = _gossiper.container(), client_state_other_shard = client_state.move_to_other_shard(), enforce_authorization = bool(_enforce_authorization)] + co_return co_await _mm.container().invoke_on(0, [&, tr = tracing::global_trace_state_ptr(trace_state), request = std::move(request), &sp = _proxy.container(), &g = _gossiper.container(), client_state_other_shard = client_state.move_to_other_shard(), enforce_authorization = bool(_enforce_authorization), warn_authorization = bool(_warn_authorization)] (service::migration_manager& mm) mutable -> future { - co_return co_await create_table_on_shard0(client_state_other_shard.get(), tr, std::move(request), sp.local(), mm, g.local(), enforce_authorization); + co_return co_await create_table_on_shard0(client_state_other_shard.get(), tr, std::move(request), sp.local(), mm, g.local(), enforce_authorization, warn_authorization, _stats); }); } @@ -1877,7 +1902,7 @@ future executor::update_table(client_state& clien verify_billing_mode(request); } - co_return co_await _mm.container().invoke_on(0, [&p = _proxy.container(), request = std::move(request), gt = tracing::global_trace_state_ptr(std::move(trace_state)), enforce_authorization = bool(_enforce_authorization), client_state_other_shard = client_state.move_to_other_shard(), empty_request] + co_return co_await _mm.container().invoke_on(0, [&p = _proxy.container(), request = std::move(request), gt = tracing::global_trace_state_ptr(std::move(trace_state)), enforce_authorization = bool(_enforce_authorization), warn_authorization = bool(_warn_authorization), client_state_other_shard = client_state.move_to_other_shard(), empty_request, &e = this->container()] (service::migration_manager& mm) mutable -> future { schema_ptr schema; size_t retries = mm.get_concurrent_ddl_retries(); @@ -2048,7 +2073,7 @@ future executor::update_table(client_state& clien co_return api_error::validation("UpdateTable requires one of GlobalSecondaryIndexUpdates, StreamSpecification or BillingMode to be specified"); } - co_await verify_permission(enforce_authorization, client_state_other_shard.get(), schema, auth::permission::ALTER); + co_await verify_permission(enforce_authorization, warn_authorization, client_state_other_shard.get(), schema, auth::permission::ALTER, e.local()._stats); auto m = co_await service::prepare_column_family_update_announcement(p.local(), schema, std::vector(), group0_guard.write_timestamp()); for (view_ptr view : new_views) { auto m2 = co_await service::prepare_new_view_announcement(p.local(), view, group0_guard.write_timestamp()); @@ -2816,7 +2841,7 @@ future executor::put_item(client_state& client_st tracing::add_table_name(trace_state, op->schema()->ks_name(), op->schema()->cf_name()); const bool needs_read_before_write = op->needs_read_before_write(); - co_await verify_permission(_enforce_authorization, client_state, op->schema(), auth::permission::MODIFY); + co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, op->schema(), auth::permission::MODIFY, _stats); auto cas_shard = op->shard_for_execute(needs_read_before_write); @@ -2920,7 +2945,7 @@ future executor::delete_item(client_state& client tracing::add_table_name(trace_state, op->schema()->ks_name(), op->schema()->cf_name()); const bool needs_read_before_write = _proxy.data_dictionary().get_config().alternator_force_read_before_write() || op->needs_read_before_write(); - co_await verify_permission(_enforce_authorization, client_state, op->schema(), auth::permission::MODIFY); + co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, op->schema(), auth::permission::MODIFY, _stats); auto cas_shard = op->shard_for_execute(needs_read_before_write); @@ -3198,7 +3223,7 @@ future executor::batch_write_item(client_state& c per_table_wcu.emplace_back(std::make_pair(per_table_stats, schema)); } for (const auto& b : mutation_builders) { - co_await verify_permission(_enforce_authorization, client_state, b.first, auth::permission::MODIFY); + co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, b.first, auth::permission::MODIFY, _stats); } // If alternator_force_read_before_write is true we will first get the previous item size // and only then do send the mutation. @@ -4424,7 +4449,7 @@ future executor::update_item(client_state& client tracing::add_table_name(trace_state, op->schema()->ks_name(), op->schema()->cf_name()); const bool needs_read_before_write = _proxy.data_dictionary().get_config().alternator_force_read_before_write() || op->needs_read_before_write(); - co_await verify_permission(_enforce_authorization, client_state, op->schema(), auth::permission::MODIFY); + co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, op->schema(), auth::permission::MODIFY, _stats); auto cas_shard = op->shard_for_execute(needs_read_before_write); @@ -4535,7 +4560,7 @@ future executor::get_item(client_state& client_st const rjson::value* expression_attribute_names = rjson::find(request, "ExpressionAttributeNames"); verify_all_are_used(expression_attribute_names, used_attribute_names, "ExpressionAttributeNames", "GetItem"); rcu_consumed_capacity_counter add_capacity(request, cl == db::consistency_level::LOCAL_QUORUM); - co_await verify_permission(_enforce_authorization, client_state, schema, auth::permission::SELECT); + co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, schema, auth::permission::SELECT, _stats); service::storage_proxy::coordinator_query_result qr = co_await _proxy.query( schema, std::move(command), std::move(partition_ranges), cl, @@ -4667,7 +4692,7 @@ future executor::batch_get_item(client_state& cli } for (const table_requests& tr : requests) { - co_await verify_permission(_enforce_authorization, client_state, tr.schema, auth::permission::SELECT); + co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, tr.schema, auth::permission::SELECT, _stats); } _stats.api_operations.batch_get_item_batch_total += batch_size; @@ -5127,10 +5152,11 @@ static future do_query(service::storage_proxy& pr filter filter, query::partition_slice::option_set custom_opts, service::client_state& client_state, - cql3::cql_stats& cql_stats, + alternator::stats& stats, tracing::trace_state_ptr trace_state, service_permit permit, - bool enforce_authorization) { + bool enforce_authorization, + bool warn_authorization) { lw_shared_ptr old_paging_state = nullptr; tracing::trace(trace_state, "Performing a database query"); @@ -5157,7 +5183,7 @@ static future do_query(service::storage_proxy& pr old_paging_state = make_lw_shared(pk, pos, query::max_partitions, query_id::create_null_id(), service::pager::paging_state::replicas_per_token_range{}, std::nullopt, 0); } - co_await verify_permission(enforce_authorization, client_state, table_schema, auth::permission::SELECT); + co_await verify_permission(enforce_authorization, warn_authorization, client_state, table_schema, auth::permission::SELECT, stats); auto regular_columns = table_schema->regular_columns() | std::views::transform(&column_definition::id) @@ -5193,9 +5219,9 @@ static future do_query(service::storage_proxy& pr rjson::add(items_descr, "LastEvaluatedKey", encode_paging_state(*table_schema, *paging_state)); } if (has_filter) { - cql_stats.filtered_rows_read_total += p->stats().rows_read_total; + stats.cql_stats.filtered_rows_read_total += p->stats().rows_read_total; // update our "filtered_row_matched_total" for all the rows matched, despited the filter - cql_stats.filtered_rows_matched_total += size; + stats.cql_stats.filtered_rows_matched_total += size; } if (opt_items) { if (opt_items->size() >= max_items_for_rapidjson_array) { @@ -5319,7 +5345,7 @@ future executor::scan(client_state& client_state, verify_all_are_used(expression_attribute_values, used_attribute_values, "ExpressionAttributeValues", "Scan"); return do_query(_proxy, schema, exclusive_start_key, std::move(partition_ranges), std::move(ck_bounds), std::move(attrs_to_get), limit, cl, - std::move(filter), query::partition_slice::option_set(), client_state, _stats.cql_stats, trace_state, std::move(permit), _enforce_authorization); + std::move(filter), query::partition_slice::option_set(), client_state, _stats, trace_state, std::move(permit), _enforce_authorization, _warn_authorization); } static dht::partition_range calculate_pk_bound(schema_ptr schema, const column_definition& pk_cdef, const rjson::value& comp_definition, const rjson::value& attrs) { @@ -5800,7 +5826,7 @@ future executor::query(client_state& client_state query::partition_slice::option_set opts; opts.set_if(!forward); return do_query(_proxy, schema, exclusive_start_key, std::move(partition_ranges), std::move(ck_bounds), std::move(attrs_to_get), limit, cl, - std::move(filter), opts, client_state, _stats.cql_stats, std::move(trace_state), std::move(permit), _enforce_authorization); + std::move(filter), opts, client_state, _stats, std::move(trace_state), std::move(permit), _enforce_authorization, _warn_authorization); } future executor::list_tables(client_state& client_state, service_permit permit, rjson::value request) { diff --git a/alternator/executor.hh b/alternator/executor.hh index de97c933fa..5a35c0fe40 100644 --- a/alternator/executor.hh +++ b/alternator/executor.hh @@ -139,6 +139,7 @@ class executor : public peering_sharded_service { db::system_distributed_keyspace& _sdks; cdc::metadata& _cdc_metadata; utils::updateable_value _enforce_authorization; + utils::updateable_value _warn_authorization; // An smp_service_group to be used for limiting the concurrency when // forwarding Alternator request between shards - if necessary for LWT. smp_service_group _ssg; @@ -264,7 +265,7 @@ bool is_big(const rjson::value& val, int big_size = 100'000); // Check CQL's Role-Based Access Control (RBAC) permission (MODIFY, // SELECT, DROP, etc.) on the given table. When permission is denied an // appropriate user-readable api_error::access_denied is thrown. -future<> verify_permission(bool enforce_authorization, const service::client_state&, const schema_ptr&, auth::permission); +future<> verify_permission(bool enforce_authorization, bool warn_authorization, const service::client_state&, const schema_ptr&, auth::permission, alternator::stats& stats); /** * Make return type for serializing the object "streamed", diff --git a/alternator/server.cc b/alternator/server.cc index 4a4ae64b75..3f07aa2127 100644 --- a/alternator/server.cc +++ b/alternator/server.cc @@ -31,6 +31,7 @@ #include "utils/overloaded_functor.hh" #include "utils/aws_sigv4.hh" #include "client_data.hh" +#include "utils/updateable_value.hh" static logging::logger slogger("alternator-server"); @@ -270,24 +271,57 @@ protected: } }; +// This function increments the authentication_failures counter, and may also +// log a warn-level message and/or throw an exception, depending on what +// enforce_authorization and warn_authorization are set to. +// The username and client address are only used for logging purposes - +// they are not included in the error message returned to the client, since +// the client knows who it is. +// Note that if enforce_authorization is false, this function will return +// without throwing. So a caller that doesn't want to continue after an +// authentication_error must explicitly return after calling this function. +template +static void authentication_error(alternator::stats& stats, bool enforce_authorization, bool warn_authorization, Exception&& e, std::string_view user, gms::inet_address client_address) { + stats.authentication_failures++; + if (enforce_authorization) { + if (warn_authorization) { + slogger.warn("alternator_warn_authorization=true: {} for user {}, client address {}", e.what(), user, client_address); + } + throw std::move(e); + } else { + if (warn_authorization) { + slogger.warn("If you set alternator_enforce_authorization=true the following will be enforced: {} for user {}, client address {}", e.what(), user, client_address); + } + } +} + future server::verify_signature(const request& req, const chunked_content& content) { - if (!_enforce_authorization) { + if (!_enforce_authorization.get() && !_warn_authorization.get()) { slogger.debug("Skipping authorization"); return make_ready_future(); } auto host_it = req._headers.find("Host"); if (host_it == req._headers.end()) { - throw api_error::invalid_signature("Host header is mandatory for signature verification"); + authentication_error(_executor._stats, _enforce_authorization.get(), _warn_authorization.get(), + api_error::invalid_signature("Host header is mandatory for signature verification"), + "", req.get_client_address()); + return make_ready_future(); } auto authorization_it = req._headers.find("Authorization"); if (authorization_it == req._headers.end()) { - throw api_error::missing_authentication_token("Authorization header is mandatory for signature verification"); + authentication_error(_executor._stats, _enforce_authorization.get(), _warn_authorization.get(), + api_error::missing_authentication_token("Authorization header is mandatory for signature verification"), + "", req.get_client_address()); + return make_ready_future(); } std::string host = host_it->second; std::string_view authorization_header = authorization_it->second; auto pos = authorization_header.find_first_of(' '); if (pos == std::string_view::npos || authorization_header.substr(0, pos) != "AWS4-HMAC-SHA256") { - throw api_error::invalid_signature(fmt::format("Authorization header must use AWS4-HMAC-SHA256 algorithm: {}", authorization_header)); + authentication_error(_executor._stats, _enforce_authorization.get(), _warn_authorization.get(), + api_error::invalid_signature(fmt::format("Authorization header must use AWS4-HMAC-SHA256 algorithm: {}", authorization_header)), + "", req.get_client_address()); + return make_ready_future(); } authorization_header.remove_prefix(pos+1); std::string credential; @@ -322,7 +356,9 @@ future server::verify_signature(const request& req, const chunked_c std::vector credential_split = split(credential, '/'); if (credential_split.size() != 5) { - throw api_error::validation(fmt::format("Incorrect credential information format: {}", credential)); + authentication_error(_executor._stats, _enforce_authorization.get(), _warn_authorization.get(), + api_error::validation(fmt::format("Incorrect credential information format: {}", credential)), "", req.get_client_address()); + return make_ready_future(); } std::string user(credential_split[0]); std::string datestamp(credential_split[1]); @@ -346,7 +382,7 @@ future server::verify_signature(const request& req, const chunked_c auto cache_getter = [&proxy = _proxy, &as = _auth_service] (std::string username) { return get_key_from_roles(proxy, as, std::move(username)); }; - return _key_cache.get_ptr(user, cache_getter).then([this, &req, &content, + return _key_cache.get_ptr(user, cache_getter).then_wrapped([this, &req, &content, user = std::move(user), host = std::move(host), datestamp = std::move(datestamp), @@ -354,18 +390,32 @@ future server::verify_signature(const request& req, const chunked_c signed_headers_map = std::move(signed_headers_map), region = std::move(region), service = std::move(service), - user_signature = std::move(user_signature)] (key_cache::value_ptr key_ptr) { + user_signature = std::move(user_signature)] (future key_ptr_fut) { + key_cache::value_ptr key_ptr(nullptr); + try { + key_ptr = key_ptr_fut.get(); + } catch (const api_error& e) { + authentication_error(_executor._stats, _enforce_authorization.get(), _warn_authorization.get(), + e, user, req.get_client_address()); + return std::string(); + } std::string signature; try { signature = utils::aws::get_signature(user, *key_ptr, std::string_view(host), "/", req._method, datestamp, signed_headers_str, signed_headers_map, &content, region, service, ""); } catch (const std::exception& e) { - throw api_error::invalid_signature(e.what()); + authentication_error(_executor._stats, _enforce_authorization.get(), _warn_authorization.get(), + api_error::invalid_signature(fmt::format("invalid signature: {}", e.what())), + user, req.get_client_address()); + return std::string(); } if (signature != std::string_view(user_signature)) { _key_cache.remove(user); - throw api_error::unrecognized_client("The security token included in the request is invalid."); + authentication_error(_executor._stats, _enforce_authorization.get(), _warn_authorization.get(), + api_error::unrecognized_client("wrong signature"), + user, req.get_client_address()); + return std::string(); } return user; }); @@ -618,7 +668,6 @@ server::server(executor& exec, service::storage_proxy& proxy, gms::gossiper& gos , _auth_service(auth_service) , _sl_controller(sl_controller) , _key_cache(1024, 1min, slogger) - , _enforce_authorization(false) , _max_users_query_size_in_trace_output(1024) , _enabled_servers{} , _pending_requests("alternator::server::pending_requests") @@ -700,10 +749,11 @@ server::server(executor& exec, service::storage_proxy& proxy, gms::gossiper& gos } future<> server::init(net::inet_address addr, std::optional port, std::optional https_port, std::optional creds, - utils::updateable_value enforce_authorization, utils::updateable_value max_users_query_size_in_trace_output, + utils::updateable_value enforce_authorization, utils::updateable_value warn_authorization, utils::updateable_value max_users_query_size_in_trace_output, semaphore* memory_limiter, utils::updateable_value max_concurrent_requests) { _memory_limiter = memory_limiter; _enforce_authorization = std::move(enforce_authorization); + _warn_authorization = std::move(warn_authorization); _max_concurrent_requests = std::move(max_concurrent_requests); _max_users_query_size_in_trace_output = std::move(max_users_query_size_in_trace_output); if (!port && !https_port) { diff --git a/alternator/server.hh b/alternator/server.hh index d966f24095..60d80fdc72 100644 --- a/alternator/server.hh +++ b/alternator/server.hh @@ -47,6 +47,7 @@ class server : public peering_sharded_service { key_cache _key_cache; utils::updateable_value _enforce_authorization; + utils::updateable_value _warn_authorization; utils::updateable_value _max_users_query_size_in_trace_output; utils::small_vector, 2> _enabled_servers; named_gate _pending_requests; @@ -99,7 +100,7 @@ public: server(executor& executor, service::storage_proxy& proxy, gms::gossiper& gossiper, auth::service& service, qos::service_level_controller& sl_controller); future<> init(net::inet_address addr, std::optional port, std::optional https_port, std::optional creds, - utils::updateable_value enforce_authorization, utils::updateable_value max_users_query_size_in_trace_output, + utils::updateable_value enforce_authorization, utils::updateable_value warn_authorization, utils::updateable_value max_users_query_size_in_trace_output, semaphore* memory_limiter, utils::updateable_value max_concurrent_requests); future<> stop(); // get_client_data() is called (on each shard separately) when the virtual diff --git a/alternator/stats.cc b/alternator/stats.cc index fe0b620e4d..ab590dbc91 100644 --- a/alternator/stats.cc +++ b/alternator/stats.cc @@ -188,6 +188,16 @@ static void register_metrics_with_optional_table(seastar::metrics::metric_groups seastar::metrics::make_total_operations("expression_cache_misses", stats.expression_cache.requests[stats::expression_types::PROJECTION_EXPRESSION].misses, seastar::metrics::description("Counts number of misses of cached expressions"), labels)(expression_label("ProjectionExpression")).aggregate(aggregate_labels).set_skip_when_empty() }); + + // Only register the following metrics for the global metrics, not per-table + if (!has_table) { + metrics.add_group("alternator", { + seastar::metrics::make_counter("authentication_failures", stats.authentication_failures, + seastar::metrics::description("total number of authentication failures"), labels).aggregate({seastar::metrics::shard_label}).set_skip_when_empty(), + seastar::metrics::make_counter("authorization_failures", stats.authorization_failures, + seastar::metrics::description("total number of authorization failures"), labels).aggregate({seastar::metrics::shard_label}).set_skip_when_empty(), + }); + } } void register_metrics(seastar::metrics::metric_groups& metrics, const stats& stats) { diff --git a/alternator/stats.hh b/alternator/stats.hh index ad03221e9f..8f39212b53 100644 --- a/alternator/stats.hh +++ b/alternator/stats.hh @@ -105,6 +105,17 @@ public: // The sizes are the the written items' sizes grouped per table. utils::estimated_histogram batch_write_item_op_size_kb{30}; } operation_sizes; + // Count of authentication and authorization failures, counted if either + // alternator_enforce_authorization or alternator_warn_authorization are + // set to true. If both are false, no authentication or authorization + // checks are performed, so failures are not recognized or counted. + // "authentication" failure means the request was not signed with a valid + // user and key combination. "authorization" failure means the request was + // authenticated to a valid user - but this user did not have permissions + // to perform the operation (considering RBAC settings and the user's + // superuser status). + uint64_t authentication_failures = 0; + uint64_t authorization_failures = 0; // Miscellaneous event counters uint64_t total_operations = 0; uint64_t unsupported_operations = 0; diff --git a/alternator/streams.cc b/alternator/streams.cc index fcdac0cd08..57ae78ccf2 100644 --- a/alternator/streams.cc +++ b/alternator/streams.cc @@ -827,7 +827,7 @@ future executor::get_records(client_state& client tracing::add_table_name(trace_state, schema->ks_name(), schema->cf_name()); - co_await verify_permission(_enforce_authorization, client_state, schema, auth::permission::SELECT); + co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, schema, auth::permission::SELECT, _stats); db::consistency_level cl = db::consistency_level::LOCAL_QUORUM; partition_key pk = iter.shard.id.to_partition_key(*schema); diff --git a/alternator/ttl.cc b/alternator/ttl.cc index 3131e0fa2e..81e83fa225 100644 --- a/alternator/ttl.cc +++ b/alternator/ttl.cc @@ -95,7 +95,7 @@ future executor::update_time_to_live(client_state } sstring attribute_name(v->GetString(), v->GetStringLength()); - co_await verify_permission(_enforce_authorization, client_state, schema, auth::permission::ALTER); + co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, schema, auth::permission::ALTER, _stats); co_await db::modify_tags(_mm, schema->ks_name(), schema->cf_name(), [&](std::map& tags_map) { if (enabled) { if (tags_map.contains(TTL_TAG_KEY)) { diff --git a/db/config.cc b/db/config.cc index c29145785b..cbe733384b 100644 --- a/db/config.cc +++ b/db/config.cc @@ -1425,7 +1425,8 @@ db::config::config(std::shared_ptr exts) , alternator_port(this, "alternator_port", value_status::Used, 0, "Alternator API port.") , alternator_https_port(this, "alternator_https_port", value_status::Used, 0, "Alternator API HTTPS port.") , alternator_address(this, "alternator_address", value_status::Used, "0.0.0.0", "Alternator API listening address.") - , alternator_enforce_authorization(this, "alternator_enforce_authorization", value_status::Used, false, "Enforce checking the authorization header for every request in Alternator.") + , alternator_enforce_authorization(this, "alternator_enforce_authorization", liveness::LiveUpdate, value_status::Used, false, "Enforce checking the authorization header for every request in Alternator.") + , alternator_warn_authorization(this, "alternator_warn_authorization", liveness::LiveUpdate, value_status::Used, false, "Count and log warnings about failed authentication or authorization") , alternator_write_isolation(this, "alternator_write_isolation", value_status::Used, "", "Default write isolation policy for Alternator.") , alternator_streams_time_window_s(this, "alternator_streams_time_window_s", value_status::Used, 10, "CDC query confidence window for alternator streams.") , alternator_timeout_in_ms(this, "alternator_timeout_in_ms", liveness::LiveUpdate, value_status::Used, 10000, diff --git a/db/config.hh b/db/config.hh index 0f477e63a5..a24dd5d945 100644 --- a/db/config.hh +++ b/db/config.hh @@ -458,6 +458,7 @@ public: named_value alternator_https_port; named_value alternator_address; named_value alternator_enforce_authorization; + named_value alternator_warn_authorization; named_value alternator_write_isolation; named_value alternator_streams_time_window_s; named_value alternator_timeout_in_ms;