alternator: add alternator_warn_authorization config
Before this patch, the configuration alternator_enforce_authorization
is a boolean: true means enforce authentication checks (i.e., each
request is signed by a valid user) and authorization checks (the user
who signed the request is allowed by RBAC to perform this request).
This patch adds a second boolean configuration option,
alternator_warn_authorization. When alternator_enforce_authorization
is false but alternator_warn_authorization is true, authentication and
authorization checks are performed as in enforce mode, but failures
are ignored and counted in two new metrics:
scylla_alternator_authentication_failures
scylla_alternator_authorization_failures
additionally,also each authentication or authorization error is logged as
a WARN-level log message. Some users prefer those log messages over
metrics, as the log messages contain additional information about the
failure that can be useful - such as the address of the misconfigured
client, or the username attempted in the request.
All combinations of the two configuration options are allowed:
* If just "enforce" is true, auth failures cause a request failure.
The failures are counted, but not logged.
* If both "enforce" and "warn" are true, auth failures cause a request
failure. The failures are both counted and logged.
* If just "warn" is true, auth failures are ignored (the request
is allowed to compelete) but are counted and logged.
* If neither "enforce" nor "warn" are true, no authentication or
authorization check are done at all. So we don't know about failures,
so naturally we don't count them and don't log them.
This patch is fairly straightforward, doing mainly the following
things:
1. Add an alternator_warn_authorization config parameter.
2. Make sure alternator_enforce_authorization is live-updatable (we'll
use this in a test in the next patch). It "almost" was, but a typo
prevented the live update from working properly.
3. Add the two new metrics, and increment them in every type of
authentication or authorization error.
Some code that needs to increment these new metrics didn't have
access to the "stats" object, so we had to pass it around more.
4. Add log messages when alternator_warn_authorization is true.
5. If alternator_enforce_authorization is false, allow the auth check
to allow the request to proceed (after having counted and/or logged
the auth error).
A separate patch will follow and add documentation suggesting to users
how to use the new "warn" options to safely switch between non-enforcing
to enforcing mode. Another patch will add tests for the new configuration
options, new metrics and new log messages.
Fixes #25308.
Signed-off-by: Nadav Har'El <nyh@scylladb.com>
This commit is contained in:
@@ -136,6 +136,7 @@ future<> controller::start_server() {
|
||||
[this, addr, alternator_port, alternator_https_port, creds = std::move(creds)] (server& server) mutable {
|
||||
return server.init(addr, alternator_port, alternator_https_port, creds,
|
||||
_config.alternator_enforce_authorization,
|
||||
_config.alternator_warn_authorization,
|
||||
_config.alternator_max_users_query_size_in_trace_output,
|
||||
&_memory_limiter.local().get_semaphore(),
|
||||
_config.max_concurrent_requests_per_shard);
|
||||
|
||||
@@ -244,7 +244,8 @@ executor::executor(gms::gossiper& gossiper,
|
||||
_mm(mm),
|
||||
_sdks(sdks),
|
||||
_cdc_metadata(cdc_metadata),
|
||||
_enforce_authorization(_proxy.data_dictionary().get_config().alternator_enforce_authorization()),
|
||||
_enforce_authorization(_proxy.data_dictionary().get_config().alternator_enforce_authorization),
|
||||
_warn_authorization(_proxy.data_dictionary().get_config().alternator_warn_authorization),
|
||||
_ssg(ssg),
|
||||
_parsed_expression_cache(std::make_unique<parsed::expression_cache>(
|
||||
parsed::expression_cache::config{_proxy.data_dictionary().get_config().alternator_max_expression_cache_entries_per_shard},
|
||||
@@ -880,15 +881,37 @@ future<executor::request_return_type> executor::describe_table(client_state& cli
|
||||
co_return rjson::print(std::move(response));
|
||||
}
|
||||
|
||||
// This function increments the authorization_failures counter, and may also
|
||||
// log a warn-level message and/or throw an access_denied exception, depending
|
||||
// on what enforce_authorization and warn_authorization are set to.
|
||||
// Note that if enforce_authorization is false, this function will return
|
||||
// without throwing. So a caller that doesn't want to continue after an
|
||||
// authorization_error must explicitly return after calling this function.
|
||||
static void authorization_error(alternator::stats& stats, bool enforce_authorization, bool warn_authorization, std::string msg) {
|
||||
stats.authorization_failures++;
|
||||
if (enforce_authorization) {
|
||||
if (warn_authorization) {
|
||||
elogger.warn("alternator_warn_authorization=true: {}", msg);
|
||||
}
|
||||
throw api_error::access_denied(std::move(msg));
|
||||
} else {
|
||||
if (warn_authorization) {
|
||||
elogger.warn("If you set alternator_enforce_authorization=true the following will be enforced: {}", msg);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Check CQL's Role-Based Access Control (RBAC) permission_to_check (MODIFY,
|
||||
// SELECT, DROP, etc.) on the given table. When permission is denied an
|
||||
// appropriate user-readable api_error::access_denied is thrown.
|
||||
future<> verify_permission(
|
||||
bool enforce_authorization,
|
||||
bool warn_authorization,
|
||||
const service::client_state& client_state,
|
||||
const schema_ptr& schema,
|
||||
auth::permission permission_to_check) {
|
||||
if (!enforce_authorization) {
|
||||
auth::permission permission_to_check,
|
||||
alternator::stats& stats) {
|
||||
if (!enforce_authorization && !warn_authorization) {
|
||||
co_return;
|
||||
}
|
||||
// Unfortunately, the fix for issue #23218 did not modify the function
|
||||
@@ -903,31 +926,33 @@ future<> verify_permission(
|
||||
if (client_state.user() && client_state.user()->name) {
|
||||
username = client_state.user()->name.value();
|
||||
}
|
||||
throw api_error::access_denied(fmt::format(
|
||||
authorization_error(stats, enforce_authorization, warn_authorization, fmt::format(
|
||||
"Write access denied on internal table {}.{} to role {} because it is not a superuser",
|
||||
schema->ks_name(), schema->cf_name(), username));
|
||||
co_return;
|
||||
}
|
||||
}
|
||||
auto resource = auth::make_data_resource(schema->ks_name(), schema->cf_name());
|
||||
if (!co_await client_state.check_has_permission(auth::command_desc(permission_to_check, resource))) {
|
||||
if (!client_state.user() || !client_state.user()->name ||
|
||||
!co_await client_state.check_has_permission(auth::command_desc(permission_to_check, resource))) {
|
||||
sstring username = "<anonymous>";
|
||||
if (client_state.user() && client_state.user()->name) {
|
||||
username = client_state.user()->name.value();
|
||||
}
|
||||
// Using exceptions for errors makes this function faster in the
|
||||
// success path (when the operation is allowed).
|
||||
throw api_error::access_denied(format(
|
||||
"{} access on table {}.{} is denied to role {}",
|
||||
authorization_error(stats, enforce_authorization, warn_authorization, fmt::format(
|
||||
"{} access on table {}.{} is denied to role {}, client address {}",
|
||||
auth::permissions::to_string(permission_to_check),
|
||||
schema->ks_name(), schema->cf_name(), username));
|
||||
schema->ks_name(), schema->cf_name(), username, client_state.get_client_address()));
|
||||
}
|
||||
}
|
||||
|
||||
// Similar to verify_permission() above, but just for CREATE operations.
|
||||
// Those do not operate on any specific table, so require permissions on
|
||||
// ALL KEYSPACES instead of any specific table.
|
||||
future<> verify_create_permission(bool enforce_authorization, const service::client_state& client_state) {
|
||||
if (!enforce_authorization) {
|
||||
static future<> verify_create_permission(bool enforce_authorization, bool warn_authorization, const service::client_state& client_state, alternator::stats& stats) {
|
||||
if (!enforce_authorization && !warn_authorization) {
|
||||
co_return;
|
||||
}
|
||||
auto resource = auth::resource(auth::resource_kind::data);
|
||||
@@ -936,7 +961,7 @@ future<> verify_create_permission(bool enforce_authorization, const service::cli
|
||||
if (client_state.user() && client_state.user()->name) {
|
||||
username = client_state.user()->name.value();
|
||||
}
|
||||
throw api_error::access_denied(format(
|
||||
authorization_error(stats, enforce_authorization, warn_authorization, fmt::format(
|
||||
"CREATE access on ALL KEYSPACES is denied to role {}", username));
|
||||
}
|
||||
}
|
||||
@@ -953,7 +978,7 @@ future<executor::request_return_type> executor::delete_table(client_state& clien
|
||||
|
||||
schema_ptr schema = get_table(_proxy, request);
|
||||
rjson::value table_description = co_await fill_table_description(schema, table_status::deleting, _proxy, client_state, trace_state, permit);
|
||||
co_await verify_permission(_enforce_authorization, client_state, schema, auth::permission::DROP);
|
||||
co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, schema, auth::permission::DROP, _stats);
|
||||
co_await _mm.container().invoke_on(0, [&, cs = client_state.move_to_other_shard()] (service::migration_manager& mm) -> future<> {
|
||||
size_t retries = mm.get_concurrent_ddl_retries();
|
||||
for (;;) {
|
||||
@@ -1291,7 +1316,7 @@ future<executor::request_return_type> executor::tag_resource(client_state& clien
|
||||
if (tags->Size() < 1) {
|
||||
co_return api_error::validation("The number of tags must be at least 1") ;
|
||||
}
|
||||
co_await verify_permission(_enforce_authorization, client_state, schema, auth::permission::ALTER);
|
||||
co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, schema, auth::permission::ALTER, _stats);
|
||||
co_await db::modify_tags(_mm, schema->ks_name(), schema->cf_name(), [tags](std::map<sstring, sstring>& tags_map) {
|
||||
update_tags_map(*tags, tags_map, update_tags_action::add_tags);
|
||||
});
|
||||
@@ -1312,7 +1337,7 @@ future<executor::request_return_type> executor::untag_resource(client_state& cli
|
||||
|
||||
schema_ptr schema = get_table_from_arn(_proxy, rjson::to_string_view(*arn));
|
||||
get_stats_from_schema(_proxy, *schema)->api_operations.untag_resource++;
|
||||
co_await verify_permission(_enforce_authorization, client_state, schema, auth::permission::ALTER);
|
||||
co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, schema, auth::permission::ALTER, _stats);
|
||||
co_await db::modify_tags(_mm, schema->ks_name(), schema->cf_name(), [tags](std::map<sstring, sstring>& tags_map) {
|
||||
update_tags_map(*tags, tags_map, update_tags_action::delete_tags);
|
||||
});
|
||||
@@ -1515,7 +1540,7 @@ static future<> mark_view_schemas_as_built(utils::chunked_vector<mutation>& out,
|
||||
}
|
||||
}
|
||||
|
||||
static future<executor::request_return_type> create_table_on_shard0(service::client_state&& client_state, tracing::trace_state_ptr trace_state, rjson::value request, service::storage_proxy& sp, service::migration_manager& mm, gms::gossiper& gossiper, bool enforce_authorization) {
|
||||
static future<executor::request_return_type> create_table_on_shard0(service::client_state&& client_state, tracing::trace_state_ptr trace_state, rjson::value request, service::storage_proxy& sp, service::migration_manager& mm, gms::gossiper& gossiper, bool enforce_authorization, bool warn_authorization, stats& stats) {
|
||||
SCYLLA_ASSERT(this_shard_id() == 0);
|
||||
|
||||
// We begin by parsing and validating the content of the CreateTable
|
||||
@@ -1721,7 +1746,7 @@ static future<executor::request_return_type> create_table_on_shard0(service::cli
|
||||
set_table_creation_time(tags_map, db_clock::now());
|
||||
builder.add_extension(db::tags_extension::NAME, ::make_shared<db::tags_extension>(tags_map));
|
||||
|
||||
co_await verify_create_permission(enforce_authorization, client_state);
|
||||
co_await verify_create_permission(enforce_authorization, warn_authorization, client_state, stats);
|
||||
|
||||
schema_ptr schema = builder.build();
|
||||
for (auto& view_builder : view_builders) {
|
||||
@@ -1822,9 +1847,9 @@ future<executor::request_return_type> executor::create_table(client_state& clien
|
||||
_stats.api_operations.create_table++;
|
||||
elogger.trace("Creating table {}", request);
|
||||
|
||||
co_return co_await _mm.container().invoke_on(0, [&, tr = tracing::global_trace_state_ptr(trace_state), request = std::move(request), &sp = _proxy.container(), &g = _gossiper.container(), client_state_other_shard = client_state.move_to_other_shard(), enforce_authorization = bool(_enforce_authorization)]
|
||||
co_return co_await _mm.container().invoke_on(0, [&, tr = tracing::global_trace_state_ptr(trace_state), request = std::move(request), &sp = _proxy.container(), &g = _gossiper.container(), client_state_other_shard = client_state.move_to_other_shard(), enforce_authorization = bool(_enforce_authorization), warn_authorization = bool(_warn_authorization)]
|
||||
(service::migration_manager& mm) mutable -> future<executor::request_return_type> {
|
||||
co_return co_await create_table_on_shard0(client_state_other_shard.get(), tr, std::move(request), sp.local(), mm, g.local(), enforce_authorization);
|
||||
co_return co_await create_table_on_shard0(client_state_other_shard.get(), tr, std::move(request), sp.local(), mm, g.local(), enforce_authorization, warn_authorization, _stats);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -1877,7 +1902,7 @@ future<executor::request_return_type> executor::update_table(client_state& clien
|
||||
verify_billing_mode(request);
|
||||
}
|
||||
|
||||
co_return co_await _mm.container().invoke_on(0, [&p = _proxy.container(), request = std::move(request), gt = tracing::global_trace_state_ptr(std::move(trace_state)), enforce_authorization = bool(_enforce_authorization), client_state_other_shard = client_state.move_to_other_shard(), empty_request]
|
||||
co_return co_await _mm.container().invoke_on(0, [&p = _proxy.container(), request = std::move(request), gt = tracing::global_trace_state_ptr(std::move(trace_state)), enforce_authorization = bool(_enforce_authorization), warn_authorization = bool(_warn_authorization), client_state_other_shard = client_state.move_to_other_shard(), empty_request, &e = this->container()]
|
||||
(service::migration_manager& mm) mutable -> future<executor::request_return_type> {
|
||||
schema_ptr schema;
|
||||
size_t retries = mm.get_concurrent_ddl_retries();
|
||||
@@ -2048,7 +2073,7 @@ future<executor::request_return_type> executor::update_table(client_state& clien
|
||||
co_return api_error::validation("UpdateTable requires one of GlobalSecondaryIndexUpdates, StreamSpecification or BillingMode to be specified");
|
||||
}
|
||||
|
||||
co_await verify_permission(enforce_authorization, client_state_other_shard.get(), schema, auth::permission::ALTER);
|
||||
co_await verify_permission(enforce_authorization, warn_authorization, client_state_other_shard.get(), schema, auth::permission::ALTER, e.local()._stats);
|
||||
auto m = co_await service::prepare_column_family_update_announcement(p.local(), schema, std::vector<view_ptr>(), group0_guard.write_timestamp());
|
||||
for (view_ptr view : new_views) {
|
||||
auto m2 = co_await service::prepare_new_view_announcement(p.local(), view, group0_guard.write_timestamp());
|
||||
@@ -2816,7 +2841,7 @@ future<executor::request_return_type> executor::put_item(client_state& client_st
|
||||
tracing::add_table_name(trace_state, op->schema()->ks_name(), op->schema()->cf_name());
|
||||
const bool needs_read_before_write = op->needs_read_before_write();
|
||||
|
||||
co_await verify_permission(_enforce_authorization, client_state, op->schema(), auth::permission::MODIFY);
|
||||
co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, op->schema(), auth::permission::MODIFY, _stats);
|
||||
|
||||
auto cas_shard = op->shard_for_execute(needs_read_before_write);
|
||||
|
||||
@@ -2920,7 +2945,7 @@ future<executor::request_return_type> executor::delete_item(client_state& client
|
||||
tracing::add_table_name(trace_state, op->schema()->ks_name(), op->schema()->cf_name());
|
||||
const bool needs_read_before_write = _proxy.data_dictionary().get_config().alternator_force_read_before_write() || op->needs_read_before_write();
|
||||
|
||||
co_await verify_permission(_enforce_authorization, client_state, op->schema(), auth::permission::MODIFY);
|
||||
co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, op->schema(), auth::permission::MODIFY, _stats);
|
||||
|
||||
auto cas_shard = op->shard_for_execute(needs_read_before_write);
|
||||
|
||||
@@ -3198,7 +3223,7 @@ future<executor::request_return_type> executor::batch_write_item(client_state& c
|
||||
per_table_wcu.emplace_back(std::make_pair(per_table_stats, schema));
|
||||
}
|
||||
for (const auto& b : mutation_builders) {
|
||||
co_await verify_permission(_enforce_authorization, client_state, b.first, auth::permission::MODIFY);
|
||||
co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, b.first, auth::permission::MODIFY, _stats);
|
||||
}
|
||||
// If alternator_force_read_before_write is true we will first get the previous item size
|
||||
// and only then do send the mutation.
|
||||
@@ -4424,7 +4449,7 @@ future<executor::request_return_type> executor::update_item(client_state& client
|
||||
tracing::add_table_name(trace_state, op->schema()->ks_name(), op->schema()->cf_name());
|
||||
const bool needs_read_before_write = _proxy.data_dictionary().get_config().alternator_force_read_before_write() || op->needs_read_before_write();
|
||||
|
||||
co_await verify_permission(_enforce_authorization, client_state, op->schema(), auth::permission::MODIFY);
|
||||
co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, op->schema(), auth::permission::MODIFY, _stats);
|
||||
|
||||
auto cas_shard = op->shard_for_execute(needs_read_before_write);
|
||||
|
||||
@@ -4535,7 +4560,7 @@ future<executor::request_return_type> executor::get_item(client_state& client_st
|
||||
const rjson::value* expression_attribute_names = rjson::find(request, "ExpressionAttributeNames");
|
||||
verify_all_are_used(expression_attribute_names, used_attribute_names, "ExpressionAttributeNames", "GetItem");
|
||||
rcu_consumed_capacity_counter add_capacity(request, cl == db::consistency_level::LOCAL_QUORUM);
|
||||
co_await verify_permission(_enforce_authorization, client_state, schema, auth::permission::SELECT);
|
||||
co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, schema, auth::permission::SELECT, _stats);
|
||||
service::storage_proxy::coordinator_query_result qr =
|
||||
co_await _proxy.query(
|
||||
schema, std::move(command), std::move(partition_ranges), cl,
|
||||
@@ -4667,7 +4692,7 @@ future<executor::request_return_type> executor::batch_get_item(client_state& cli
|
||||
}
|
||||
|
||||
for (const table_requests& tr : requests) {
|
||||
co_await verify_permission(_enforce_authorization, client_state, tr.schema, auth::permission::SELECT);
|
||||
co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, tr.schema, auth::permission::SELECT, _stats);
|
||||
}
|
||||
|
||||
_stats.api_operations.batch_get_item_batch_total += batch_size;
|
||||
@@ -5127,10 +5152,11 @@ static future<executor::request_return_type> do_query(service::storage_proxy& pr
|
||||
filter filter,
|
||||
query::partition_slice::option_set custom_opts,
|
||||
service::client_state& client_state,
|
||||
cql3::cql_stats& cql_stats,
|
||||
alternator::stats& stats,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
service_permit permit,
|
||||
bool enforce_authorization) {
|
||||
bool enforce_authorization,
|
||||
bool warn_authorization) {
|
||||
lw_shared_ptr<service::pager::paging_state> old_paging_state = nullptr;
|
||||
|
||||
tracing::trace(trace_state, "Performing a database query");
|
||||
@@ -5157,7 +5183,7 @@ static future<executor::request_return_type> do_query(service::storage_proxy& pr
|
||||
old_paging_state = make_lw_shared<service::pager::paging_state>(pk, pos, query::max_partitions, query_id::create_null_id(), service::pager::paging_state::replicas_per_token_range{}, std::nullopt, 0);
|
||||
}
|
||||
|
||||
co_await verify_permission(enforce_authorization, client_state, table_schema, auth::permission::SELECT);
|
||||
co_await verify_permission(enforce_authorization, warn_authorization, client_state, table_schema, auth::permission::SELECT, stats);
|
||||
|
||||
auto regular_columns =
|
||||
table_schema->regular_columns() | std::views::transform(&column_definition::id)
|
||||
@@ -5193,9 +5219,9 @@ static future<executor::request_return_type> do_query(service::storage_proxy& pr
|
||||
rjson::add(items_descr, "LastEvaluatedKey", encode_paging_state(*table_schema, *paging_state));
|
||||
}
|
||||
if (has_filter) {
|
||||
cql_stats.filtered_rows_read_total += p->stats().rows_read_total;
|
||||
stats.cql_stats.filtered_rows_read_total += p->stats().rows_read_total;
|
||||
// update our "filtered_row_matched_total" for all the rows matched, despited the filter
|
||||
cql_stats.filtered_rows_matched_total += size;
|
||||
stats.cql_stats.filtered_rows_matched_total += size;
|
||||
}
|
||||
if (opt_items) {
|
||||
if (opt_items->size() >= max_items_for_rapidjson_array) {
|
||||
@@ -5319,7 +5345,7 @@ future<executor::request_return_type> executor::scan(client_state& client_state,
|
||||
verify_all_are_used(expression_attribute_values, used_attribute_values, "ExpressionAttributeValues", "Scan");
|
||||
|
||||
return do_query(_proxy, schema, exclusive_start_key, std::move(partition_ranges), std::move(ck_bounds), std::move(attrs_to_get), limit, cl,
|
||||
std::move(filter), query::partition_slice::option_set(), client_state, _stats.cql_stats, trace_state, std::move(permit), _enforce_authorization);
|
||||
std::move(filter), query::partition_slice::option_set(), client_state, _stats, trace_state, std::move(permit), _enforce_authorization, _warn_authorization);
|
||||
}
|
||||
|
||||
static dht::partition_range calculate_pk_bound(schema_ptr schema, const column_definition& pk_cdef, const rjson::value& comp_definition, const rjson::value& attrs) {
|
||||
@@ -5800,7 +5826,7 @@ future<executor::request_return_type> executor::query(client_state& client_state
|
||||
query::partition_slice::option_set opts;
|
||||
opts.set_if<query::partition_slice::option::reversed>(!forward);
|
||||
return do_query(_proxy, schema, exclusive_start_key, std::move(partition_ranges), std::move(ck_bounds), std::move(attrs_to_get), limit, cl,
|
||||
std::move(filter), opts, client_state, _stats.cql_stats, std::move(trace_state), std::move(permit), _enforce_authorization);
|
||||
std::move(filter), opts, client_state, _stats, std::move(trace_state), std::move(permit), _enforce_authorization, _warn_authorization);
|
||||
}
|
||||
|
||||
future<executor::request_return_type> executor::list_tables(client_state& client_state, service_permit permit, rjson::value request) {
|
||||
|
||||
@@ -139,6 +139,7 @@ class executor : public peering_sharded_service<executor> {
|
||||
db::system_distributed_keyspace& _sdks;
|
||||
cdc::metadata& _cdc_metadata;
|
||||
utils::updateable_value<bool> _enforce_authorization;
|
||||
utils::updateable_value<bool> _warn_authorization;
|
||||
// An smp_service_group to be used for limiting the concurrency when
|
||||
// forwarding Alternator request between shards - if necessary for LWT.
|
||||
smp_service_group _ssg;
|
||||
@@ -264,7 +265,7 @@ bool is_big(const rjson::value& val, int big_size = 100'000);
|
||||
// Check CQL's Role-Based Access Control (RBAC) permission (MODIFY,
|
||||
// SELECT, DROP, etc.) on the given table. When permission is denied an
|
||||
// appropriate user-readable api_error::access_denied is thrown.
|
||||
future<> verify_permission(bool enforce_authorization, const service::client_state&, const schema_ptr&, auth::permission);
|
||||
future<> verify_permission(bool enforce_authorization, bool warn_authorization, const service::client_state&, const schema_ptr&, auth::permission, alternator::stats& stats);
|
||||
|
||||
/**
|
||||
* Make return type for serializing the object "streamed",
|
||||
|
||||
@@ -31,6 +31,7 @@
|
||||
#include "utils/overloaded_functor.hh"
|
||||
#include "utils/aws_sigv4.hh"
|
||||
#include "client_data.hh"
|
||||
#include "utils/updateable_value.hh"
|
||||
|
||||
static logging::logger slogger("alternator-server");
|
||||
|
||||
@@ -270,24 +271,57 @@ protected:
|
||||
}
|
||||
};
|
||||
|
||||
// This function increments the authentication_failures counter, and may also
|
||||
// log a warn-level message and/or throw an exception, depending on what
|
||||
// enforce_authorization and warn_authorization are set to.
|
||||
// The username and client address are only used for logging purposes -
|
||||
// they are not included in the error message returned to the client, since
|
||||
// the client knows who it is.
|
||||
// Note that if enforce_authorization is false, this function will return
|
||||
// without throwing. So a caller that doesn't want to continue after an
|
||||
// authentication_error must explicitly return after calling this function.
|
||||
template<typename Exception>
|
||||
static void authentication_error(alternator::stats& stats, bool enforce_authorization, bool warn_authorization, Exception&& e, std::string_view user, gms::inet_address client_address) {
|
||||
stats.authentication_failures++;
|
||||
if (enforce_authorization) {
|
||||
if (warn_authorization) {
|
||||
slogger.warn("alternator_warn_authorization=true: {} for user {}, client address {}", e.what(), user, client_address);
|
||||
}
|
||||
throw std::move(e);
|
||||
} else {
|
||||
if (warn_authorization) {
|
||||
slogger.warn("If you set alternator_enforce_authorization=true the following will be enforced: {} for user {}, client address {}", e.what(), user, client_address);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
future<std::string> server::verify_signature(const request& req, const chunked_content& content) {
|
||||
if (!_enforce_authorization) {
|
||||
if (!_enforce_authorization.get() && !_warn_authorization.get()) {
|
||||
slogger.debug("Skipping authorization");
|
||||
return make_ready_future<std::string>();
|
||||
}
|
||||
auto host_it = req._headers.find("Host");
|
||||
if (host_it == req._headers.end()) {
|
||||
throw api_error::invalid_signature("Host header is mandatory for signature verification");
|
||||
authentication_error(_executor._stats, _enforce_authorization.get(), _warn_authorization.get(),
|
||||
api_error::invalid_signature("Host header is mandatory for signature verification"),
|
||||
"", req.get_client_address());
|
||||
return make_ready_future<std::string>();
|
||||
}
|
||||
auto authorization_it = req._headers.find("Authorization");
|
||||
if (authorization_it == req._headers.end()) {
|
||||
throw api_error::missing_authentication_token("Authorization header is mandatory for signature verification");
|
||||
authentication_error(_executor._stats, _enforce_authorization.get(), _warn_authorization.get(),
|
||||
api_error::missing_authentication_token("Authorization header is mandatory for signature verification"),
|
||||
"", req.get_client_address());
|
||||
return make_ready_future<std::string>();
|
||||
}
|
||||
std::string host = host_it->second;
|
||||
std::string_view authorization_header = authorization_it->second;
|
||||
auto pos = authorization_header.find_first_of(' ');
|
||||
if (pos == std::string_view::npos || authorization_header.substr(0, pos) != "AWS4-HMAC-SHA256") {
|
||||
throw api_error::invalid_signature(fmt::format("Authorization header must use AWS4-HMAC-SHA256 algorithm: {}", authorization_header));
|
||||
authentication_error(_executor._stats, _enforce_authorization.get(), _warn_authorization.get(),
|
||||
api_error::invalid_signature(fmt::format("Authorization header must use AWS4-HMAC-SHA256 algorithm: {}", authorization_header)),
|
||||
"", req.get_client_address());
|
||||
return make_ready_future<std::string>();
|
||||
}
|
||||
authorization_header.remove_prefix(pos+1);
|
||||
std::string credential;
|
||||
@@ -322,7 +356,9 @@ future<std::string> server::verify_signature(const request& req, const chunked_c
|
||||
|
||||
std::vector<std::string_view> credential_split = split(credential, '/');
|
||||
if (credential_split.size() != 5) {
|
||||
throw api_error::validation(fmt::format("Incorrect credential information format: {}", credential));
|
||||
authentication_error(_executor._stats, _enforce_authorization.get(), _warn_authorization.get(),
|
||||
api_error::validation(fmt::format("Incorrect credential information format: {}", credential)), "", req.get_client_address());
|
||||
return make_ready_future<std::string>();
|
||||
}
|
||||
std::string user(credential_split[0]);
|
||||
std::string datestamp(credential_split[1]);
|
||||
@@ -346,7 +382,7 @@ future<std::string> server::verify_signature(const request& req, const chunked_c
|
||||
auto cache_getter = [&proxy = _proxy, &as = _auth_service] (std::string username) {
|
||||
return get_key_from_roles(proxy, as, std::move(username));
|
||||
};
|
||||
return _key_cache.get_ptr(user, cache_getter).then([this, &req, &content,
|
||||
return _key_cache.get_ptr(user, cache_getter).then_wrapped([this, &req, &content,
|
||||
user = std::move(user),
|
||||
host = std::move(host),
|
||||
datestamp = std::move(datestamp),
|
||||
@@ -354,18 +390,32 @@ future<std::string> server::verify_signature(const request& req, const chunked_c
|
||||
signed_headers_map = std::move(signed_headers_map),
|
||||
region = std::move(region),
|
||||
service = std::move(service),
|
||||
user_signature = std::move(user_signature)] (key_cache::value_ptr key_ptr) {
|
||||
user_signature = std::move(user_signature)] (future<key_cache::value_ptr> key_ptr_fut) {
|
||||
key_cache::value_ptr key_ptr(nullptr);
|
||||
try {
|
||||
key_ptr = key_ptr_fut.get();
|
||||
} catch (const api_error& e) {
|
||||
authentication_error(_executor._stats, _enforce_authorization.get(), _warn_authorization.get(),
|
||||
e, user, req.get_client_address());
|
||||
return std::string();
|
||||
}
|
||||
std::string signature;
|
||||
try {
|
||||
signature = utils::aws::get_signature(user, *key_ptr, std::string_view(host), "/", req._method,
|
||||
datestamp, signed_headers_str, signed_headers_map, &content, region, service, "");
|
||||
} catch (const std::exception& e) {
|
||||
throw api_error::invalid_signature(e.what());
|
||||
authentication_error(_executor._stats, _enforce_authorization.get(), _warn_authorization.get(),
|
||||
api_error::invalid_signature(fmt::format("invalid signature: {}", e.what())),
|
||||
user, req.get_client_address());
|
||||
return std::string();
|
||||
}
|
||||
|
||||
if (signature != std::string_view(user_signature)) {
|
||||
_key_cache.remove(user);
|
||||
throw api_error::unrecognized_client("The security token included in the request is invalid.");
|
||||
authentication_error(_executor._stats, _enforce_authorization.get(), _warn_authorization.get(),
|
||||
api_error::unrecognized_client("wrong signature"),
|
||||
user, req.get_client_address());
|
||||
return std::string();
|
||||
}
|
||||
return user;
|
||||
});
|
||||
@@ -618,7 +668,6 @@ server::server(executor& exec, service::storage_proxy& proxy, gms::gossiper& gos
|
||||
, _auth_service(auth_service)
|
||||
, _sl_controller(sl_controller)
|
||||
, _key_cache(1024, 1min, slogger)
|
||||
, _enforce_authorization(false)
|
||||
, _max_users_query_size_in_trace_output(1024)
|
||||
, _enabled_servers{}
|
||||
, _pending_requests("alternator::server::pending_requests")
|
||||
@@ -700,10 +749,11 @@ server::server(executor& exec, service::storage_proxy& proxy, gms::gossiper& gos
|
||||
}
|
||||
|
||||
future<> server::init(net::inet_address addr, std::optional<uint16_t> port, std::optional<uint16_t> https_port, std::optional<tls::credentials_builder> creds,
|
||||
utils::updateable_value<bool> enforce_authorization, utils::updateable_value<uint64_t> max_users_query_size_in_trace_output,
|
||||
utils::updateable_value<bool> enforce_authorization, utils::updateable_value<bool> warn_authorization, utils::updateable_value<uint64_t> max_users_query_size_in_trace_output,
|
||||
semaphore* memory_limiter, utils::updateable_value<uint32_t> max_concurrent_requests) {
|
||||
_memory_limiter = memory_limiter;
|
||||
_enforce_authorization = std::move(enforce_authorization);
|
||||
_warn_authorization = std::move(warn_authorization);
|
||||
_max_concurrent_requests = std::move(max_concurrent_requests);
|
||||
_max_users_query_size_in_trace_output = std::move(max_users_query_size_in_trace_output);
|
||||
if (!port && !https_port) {
|
||||
|
||||
@@ -47,6 +47,7 @@ class server : public peering_sharded_service<server> {
|
||||
|
||||
key_cache _key_cache;
|
||||
utils::updateable_value<bool> _enforce_authorization;
|
||||
utils::updateable_value<bool> _warn_authorization;
|
||||
utils::updateable_value<uint64_t> _max_users_query_size_in_trace_output;
|
||||
utils::small_vector<std::reference_wrapper<seastar::httpd::http_server>, 2> _enabled_servers;
|
||||
named_gate _pending_requests;
|
||||
@@ -99,7 +100,7 @@ public:
|
||||
server(executor& executor, service::storage_proxy& proxy, gms::gossiper& gossiper, auth::service& service, qos::service_level_controller& sl_controller);
|
||||
|
||||
future<> init(net::inet_address addr, std::optional<uint16_t> port, std::optional<uint16_t> https_port, std::optional<tls::credentials_builder> creds,
|
||||
utils::updateable_value<bool> enforce_authorization, utils::updateable_value<uint64_t> max_users_query_size_in_trace_output,
|
||||
utils::updateable_value<bool> enforce_authorization, utils::updateable_value<bool> warn_authorization, utils::updateable_value<uint64_t> max_users_query_size_in_trace_output,
|
||||
semaphore* memory_limiter, utils::updateable_value<uint32_t> max_concurrent_requests);
|
||||
future<> stop();
|
||||
// get_client_data() is called (on each shard separately) when the virtual
|
||||
|
||||
@@ -188,6 +188,16 @@ static void register_metrics_with_optional_table(seastar::metrics::metric_groups
|
||||
seastar::metrics::make_total_operations("expression_cache_misses", stats.expression_cache.requests[stats::expression_types::PROJECTION_EXPRESSION].misses,
|
||||
seastar::metrics::description("Counts number of misses of cached expressions"), labels)(expression_label("ProjectionExpression")).aggregate(aggregate_labels).set_skip_when_empty()
|
||||
});
|
||||
|
||||
// Only register the following metrics for the global metrics, not per-table
|
||||
if (!has_table) {
|
||||
metrics.add_group("alternator", {
|
||||
seastar::metrics::make_counter("authentication_failures", stats.authentication_failures,
|
||||
seastar::metrics::description("total number of authentication failures"), labels).aggregate({seastar::metrics::shard_label}).set_skip_when_empty(),
|
||||
seastar::metrics::make_counter("authorization_failures", stats.authorization_failures,
|
||||
seastar::metrics::description("total number of authorization failures"), labels).aggregate({seastar::metrics::shard_label}).set_skip_when_empty(),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
void register_metrics(seastar::metrics::metric_groups& metrics, const stats& stats) {
|
||||
|
||||
@@ -105,6 +105,17 @@ public:
|
||||
// The sizes are the the written items' sizes grouped per table.
|
||||
utils::estimated_histogram batch_write_item_op_size_kb{30};
|
||||
} operation_sizes;
|
||||
// Count of authentication and authorization failures, counted if either
|
||||
// alternator_enforce_authorization or alternator_warn_authorization are
|
||||
// set to true. If both are false, no authentication or authorization
|
||||
// checks are performed, so failures are not recognized or counted.
|
||||
// "authentication" failure means the request was not signed with a valid
|
||||
// user and key combination. "authorization" failure means the request was
|
||||
// authenticated to a valid user - but this user did not have permissions
|
||||
// to perform the operation (considering RBAC settings and the user's
|
||||
// superuser status).
|
||||
uint64_t authentication_failures = 0;
|
||||
uint64_t authorization_failures = 0;
|
||||
// Miscellaneous event counters
|
||||
uint64_t total_operations = 0;
|
||||
uint64_t unsupported_operations = 0;
|
||||
|
||||
@@ -827,7 +827,7 @@ future<executor::request_return_type> executor::get_records(client_state& client
|
||||
|
||||
tracing::add_table_name(trace_state, schema->ks_name(), schema->cf_name());
|
||||
|
||||
co_await verify_permission(_enforce_authorization, client_state, schema, auth::permission::SELECT);
|
||||
co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, schema, auth::permission::SELECT, _stats);
|
||||
|
||||
db::consistency_level cl = db::consistency_level::LOCAL_QUORUM;
|
||||
partition_key pk = iter.shard.id.to_partition_key(*schema);
|
||||
|
||||
@@ -95,7 +95,7 @@ future<executor::request_return_type> executor::update_time_to_live(client_state
|
||||
}
|
||||
sstring attribute_name(v->GetString(), v->GetStringLength());
|
||||
|
||||
co_await verify_permission(_enforce_authorization, client_state, schema, auth::permission::ALTER);
|
||||
co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, schema, auth::permission::ALTER, _stats);
|
||||
co_await db::modify_tags(_mm, schema->ks_name(), schema->cf_name(), [&](std::map<sstring, sstring>& tags_map) {
|
||||
if (enabled) {
|
||||
if (tags_map.contains(TTL_TAG_KEY)) {
|
||||
|
||||
@@ -1425,7 +1425,8 @@ db::config::config(std::shared_ptr<db::extensions> exts)
|
||||
, alternator_port(this, "alternator_port", value_status::Used, 0, "Alternator API port.")
|
||||
, alternator_https_port(this, "alternator_https_port", value_status::Used, 0, "Alternator API HTTPS port.")
|
||||
, alternator_address(this, "alternator_address", value_status::Used, "0.0.0.0", "Alternator API listening address.")
|
||||
, alternator_enforce_authorization(this, "alternator_enforce_authorization", value_status::Used, false, "Enforce checking the authorization header for every request in Alternator.")
|
||||
, alternator_enforce_authorization(this, "alternator_enforce_authorization", liveness::LiveUpdate, value_status::Used, false, "Enforce checking the authorization header for every request in Alternator.")
|
||||
, alternator_warn_authorization(this, "alternator_warn_authorization", liveness::LiveUpdate, value_status::Used, false, "Count and log warnings about failed authentication or authorization")
|
||||
, alternator_write_isolation(this, "alternator_write_isolation", value_status::Used, "", "Default write isolation policy for Alternator.")
|
||||
, alternator_streams_time_window_s(this, "alternator_streams_time_window_s", value_status::Used, 10, "CDC query confidence window for alternator streams.")
|
||||
, alternator_timeout_in_ms(this, "alternator_timeout_in_ms", liveness::LiveUpdate, value_status::Used, 10000,
|
||||
|
||||
@@ -458,6 +458,7 @@ public:
|
||||
named_value<uint16_t> alternator_https_port;
|
||||
named_value<sstring> alternator_address;
|
||||
named_value<bool> alternator_enforce_authorization;
|
||||
named_value<bool> alternator_warn_authorization;
|
||||
named_value<sstring> alternator_write_isolation;
|
||||
named_value<uint32_t> alternator_streams_time_window_s;
|
||||
named_value<uint32_t> alternator_timeout_in_ms;
|
||||
|
||||
Reference in New Issue
Block a user