auth: Switch to sharded service
This change appears quite large, but is logically fairly simple. Previously, the `auth` module was structured around global state in a number of ways: - There existed global instances for the authenticator and the authorizer, which were accessed pervasively throughout the system through `auth::authenticator::get()` and `auth::authorizer::get()`, respectively. These instances needed to be initialized before they could be used with `auth::authenticator::setup(sstring type_name)` and `auth::authorizer::setup(sstring type_name)`. - The implementation of the `auth::auth` functions and the authenticator and authorizer depended on resources accessed globally through `cql3::get_local_query_processor()` and `service::get_local_migration_manager()`. - CQL statements would check for access and manage users through static functions in `auth::auth`. These functions would access the global authenticator and authorizer instances and depended on the necessary systems being started before they were used. This change eliminates global state from all of these. The specific changes are: - Move out `allow_all_authenticator` and `allow_all_authorizer` into their own files so that they're constructed like any other authenticator or authorizer. - Delete `auth.hh` and `auth.cc`. Constants and helper functions useful for implementing functionality in the `auth` module have moved to `common.hh`. - Remove silent global dependency in `auth::authenticated_user::is_super()` on the auth* service in favour of a new function `auth::is_super_user()` with an explicit auth* service argument. - Remove global authenticator and authorizer instances, as well as the `setup()` functions. - Expose dependency on the auth* service in `auth::authorizer::authorize()` and `auth::authorizer::list()`, which is necessary to check for superuser status. - Add an explicit `service::migration_manager` argument to the authenticators and authorizers so they can announce metadata tables. - The permissions cache now requires an auth* service reference instead of just an authorizer since authorizing also requires this. - The permissions cache configuration can now easily be created from the DB configuration. - Move the static functions in `auth::auth` to the new `auth::service`. Where possible, previously static resources like the `delayed_tasks` are now members. - Validating `cql3::user_options` requires an authenticator, which was previously accessed globally. - Instances of the auth* service are accessed through `external` instances of `client_state` instead of globally. This includes several CQL statements including `alter_user_statement`, `create_user_statement`, `drop_user_statement`, `grant_statement`, `list_permissions_statement`, `permissions_altering_statement`, and `revoke_statement`. For `internal` `client_state`, this is `nullptr`. - Since the `cql_server` is responsible for instantiating connections and each connection gets a new `client_state`, the `cql_server` is instantiated with a reference to the auth* service. - Similarly, the Thrift server is now also instantiated with a reference to the auth* service. - Since the storage service is responsible for instantiating and starting the sharded servers, it is instantiated with the sharded auth* service which it threads through. All relevant factory functions have been updated. - The storage service is still responsible for starting the auth* service it has been provided, and shutting it down. - The `cql_test_env` is now instantiated with an instance of the auth* service, and can be accessed through a member function. - All unit tests have been updated and pass. Fixes #2929.
This commit is contained in:
41
auth/allow_all_authenticator.cc
Normal file
41
auth/allow_all_authenticator.cc
Normal file
@@ -0,0 +1,41 @@
|
||||
/*
|
||||
* Copyright (C) 2017 ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* This file is part of Scylla.
|
||||
*
|
||||
* Scylla is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Scylla is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "auth/allow_all_authenticator.hh"
|
||||
|
||||
#include "service/migration_manager.hh"
|
||||
#include "utils/class_registrator.hh"
|
||||
|
||||
namespace auth {
|
||||
|
||||
const sstring& allow_all_authenticator_name() {
|
||||
static const sstring name = meta::AUTH_PACKAGE_NAME + "AllowAllAuthenticator";
|
||||
return name;
|
||||
}
|
||||
|
||||
// To ensure correct initialization order, we unfortunately need to use a string literal.
|
||||
static const class_registrator<
|
||||
authenticator,
|
||||
allow_all_authenticator,
|
||||
cql3::query_processor&,
|
||||
::service::migration_manager&> registration("org.apache.cassandra.auth.AllowAllAuthenticator");
|
||||
|
||||
}
|
||||
97
auth/allow_all_authenticator.hh
Normal file
97
auth/allow_all_authenticator.hh
Normal file
@@ -0,0 +1,97 @@
|
||||
/*
|
||||
* Copyright (C) 2017 ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* This file is part of Scylla.
|
||||
*
|
||||
* Scylla is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Scylla is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <stdexcept>
|
||||
|
||||
#include "auth/authenticator.hh"
|
||||
#include "auth/authenticated_user.hh"
|
||||
#include "auth/common.hh"
|
||||
|
||||
namespace cql3 {
|
||||
class query_processor;
|
||||
}
|
||||
|
||||
namespace service {
|
||||
class migration_manager;
|
||||
}
|
||||
|
||||
namespace auth {
|
||||
|
||||
const sstring& allow_all_authenticator_name();
|
||||
|
||||
class allow_all_authenticator final : public authenticator {
|
||||
public:
|
||||
allow_all_authenticator(cql3::query_processor&, ::service::migration_manager&) {
|
||||
}
|
||||
|
||||
future<> start() override {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
future<> stop() override {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
const sstring& qualified_java_name() const override {
|
||||
return allow_all_authenticator_name();
|
||||
}
|
||||
|
||||
bool require_authentication() const override {
|
||||
return false;
|
||||
}
|
||||
|
||||
option_set supported_options() const override {
|
||||
return option_set();
|
||||
}
|
||||
|
||||
option_set alterable_options() const override {
|
||||
return option_set();
|
||||
}
|
||||
|
||||
future<::shared_ptr<authenticated_user>> authenticate(const credentials_map& credentials) const override {
|
||||
return make_ready_future<::shared_ptr<authenticated_user>>(::make_shared<authenticated_user>());
|
||||
}
|
||||
|
||||
future<> create(sstring username, const option_map& options) override {
|
||||
return make_ready_future();
|
||||
}
|
||||
|
||||
future<> alter(sstring username, const option_map& options) override {
|
||||
return make_ready_future();
|
||||
}
|
||||
|
||||
future<> drop(sstring username) override {
|
||||
return make_ready_future();
|
||||
}
|
||||
|
||||
const resource_ids& protected_resources() const override {
|
||||
static const resource_ids ids;
|
||||
return ids;
|
||||
}
|
||||
|
||||
::shared_ptr<sasl_challenge> new_sasl_challenge() const override {
|
||||
throw std::runtime_error("Should not reach");
|
||||
}
|
||||
};
|
||||
|
||||
}
|
||||
41
auth/allow_all_authorizer.cc
Normal file
41
auth/allow_all_authorizer.cc
Normal file
@@ -0,0 +1,41 @@
|
||||
/*
|
||||
* Copyright (C) 2017 ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* This file is part of Scylla.
|
||||
*
|
||||
* Scylla is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Scylla is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "auth/allow_all_authorizer.hh"
|
||||
|
||||
#include "auth/common.hh"
|
||||
#include "utils/class_registrator.hh"
|
||||
|
||||
namespace auth {
|
||||
|
||||
const sstring& allow_all_authorizer_name() {
|
||||
static const sstring name = meta::AUTH_PACKAGE_NAME + "AllowAllAuthorizer";
|
||||
return name;
|
||||
}
|
||||
|
||||
// To ensure correct initialization order, we unfortunately need to use a string literal.
|
||||
static const class_registrator<
|
||||
authorizer,
|
||||
allow_all_authorizer,
|
||||
cql3::query_processor&,
|
||||
::service::migration_manager&> registration("org.apache.cassandra.auth.AllowAllAuthorizer");
|
||||
|
||||
}
|
||||
98
auth/allow_all_authorizer.hh
Normal file
98
auth/allow_all_authorizer.hh
Normal file
@@ -0,0 +1,98 @@
|
||||
/*
|
||||
* Copyright (C) 2017 ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* This file is part of Scylla.
|
||||
*
|
||||
* Scylla is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Scylla is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "authorizer.hh"
|
||||
#include "exceptions/exceptions.hh"
|
||||
#include "stdx.hh"
|
||||
|
||||
namespace cql3 {
|
||||
class query_processor;
|
||||
}
|
||||
|
||||
namespace service {
|
||||
class migration_manager;
|
||||
}
|
||||
|
||||
namespace auth {
|
||||
|
||||
class service;
|
||||
|
||||
const sstring& allow_all_authorizer_name();
|
||||
|
||||
class allow_all_authorizer final : public authorizer {
|
||||
public:
|
||||
allow_all_authorizer(cql3::query_processor&, ::service::migration_manager&) {
|
||||
}
|
||||
|
||||
future<> start() override {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
future<> stop() override {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
const sstring& qualified_java_name() const override {
|
||||
return allow_all_authorizer_name();
|
||||
}
|
||||
|
||||
future<permission_set> authorize(service&, ::shared_ptr<authenticated_user>, data_resource) const override {
|
||||
return make_ready_future<permission_set>(permissions::ALL);
|
||||
}
|
||||
|
||||
future<> grant(::shared_ptr<authenticated_user>, permission_set, data_resource, sstring) override {
|
||||
throw exceptions::invalid_request_exception("GRANT operation is not supported by AllowAllAuthorizer");
|
||||
}
|
||||
|
||||
future<> revoke(::shared_ptr<authenticated_user>, permission_set, data_resource, sstring) override {
|
||||
throw exceptions::invalid_request_exception("REVOKE operation is not supported by AllowAllAuthorizer");
|
||||
}
|
||||
|
||||
future<std::vector<permission_details>> list(
|
||||
service&,
|
||||
::shared_ptr<authenticated_user> performer,
|
||||
permission_set,
|
||||
stdx::optional<data_resource>,
|
||||
stdx::optional<sstring>) const override {
|
||||
throw exceptions::invalid_request_exception("LIST PERMISSIONS operation is not supported by AllowAllAuthorizer");
|
||||
}
|
||||
|
||||
future<> revoke_all(sstring dropped_user) override {
|
||||
return make_ready_future();
|
||||
}
|
||||
|
||||
future<> revoke_all(data_resource) override {
|
||||
return make_ready_future();
|
||||
}
|
||||
|
||||
const resource_ids& protected_resources() override {
|
||||
static const resource_ids ids;
|
||||
return ids;
|
||||
}
|
||||
|
||||
future<> validate_configuration() const override {
|
||||
return make_ready_future();
|
||||
}
|
||||
};
|
||||
|
||||
}
|
||||
286
auth/auth.cc
286
auth/auth.cc
@@ -1,286 +0,0 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
/*
|
||||
* Copyright (C) 2016 ScyllaDB
|
||||
*
|
||||
* Modified by ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* This file is part of Scylla.
|
||||
*
|
||||
* Scylla is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Scylla is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include <chrono>
|
||||
|
||||
#include <seastar/core/sleep.hh>
|
||||
|
||||
#include <seastar/core/distributed.hh>
|
||||
|
||||
#include "auth.hh"
|
||||
#include "authenticator.hh"
|
||||
#include "authorizer.hh"
|
||||
#include "common.hh"
|
||||
#include "database.hh"
|
||||
#include "cql3/query_processor.hh"
|
||||
#include "cql3/untyped_result_set.hh"
|
||||
#include "cql3/statements/raw/cf_statement.hh"
|
||||
#include "cql3/statements/create_table_statement.hh"
|
||||
#include "db/config.hh"
|
||||
#include "delayed_tasks.hh"
|
||||
#include "permissions_cache.hh"
|
||||
#include "service/migration_manager.hh"
|
||||
#include "utils/loading_cache.hh"
|
||||
#include "utils/hash.hh"
|
||||
|
||||
static const sstring USER_NAME("name");
|
||||
static const sstring SUPER("super");
|
||||
|
||||
static logging::logger alogger("auth");
|
||||
|
||||
// TODO: configurable
|
||||
using namespace std::chrono_literals;
|
||||
static const std::chrono::milliseconds SUPERUSER_SETUP_DELAY = 10000ms;
|
||||
|
||||
class auth_migration_listener : public service::migration_listener {
|
||||
void on_create_keyspace(const sstring& ks_name) override {}
|
||||
void on_create_column_family(const sstring& ks_name, const sstring& cf_name) override {}
|
||||
void on_create_user_type(const sstring& ks_name, const sstring& type_name) override {}
|
||||
void on_create_function(const sstring& ks_name, const sstring& function_name) override {}
|
||||
void on_create_aggregate(const sstring& ks_name, const sstring& aggregate_name) override {}
|
||||
void on_create_view(const sstring& ks_name, const sstring& view_name) override {}
|
||||
|
||||
void on_update_keyspace(const sstring& ks_name) override {}
|
||||
void on_update_column_family(const sstring& ks_name, const sstring& cf_name, bool) override {}
|
||||
void on_update_user_type(const sstring& ks_name, const sstring& type_name) override {}
|
||||
void on_update_function(const sstring& ks_name, const sstring& function_name) override {}
|
||||
void on_update_aggregate(const sstring& ks_name, const sstring& aggregate_name) override {}
|
||||
void on_update_view(const sstring& ks_name, const sstring& view_name, bool columns_changed) override {}
|
||||
|
||||
void on_drop_keyspace(const sstring& ks_name) override {
|
||||
auth::authorizer::get().revoke_all(auth::data_resource(ks_name));
|
||||
}
|
||||
void on_drop_column_family(const sstring& ks_name, const sstring& cf_name) override {
|
||||
auth::authorizer::get().revoke_all(auth::data_resource(ks_name, cf_name));
|
||||
}
|
||||
void on_drop_user_type(const sstring& ks_name, const sstring& type_name) override {}
|
||||
void on_drop_function(const sstring& ks_name, const sstring& function_name) override {}
|
||||
void on_drop_aggregate(const sstring& ks_name, const sstring& aggregate_name) override {}
|
||||
void on_drop_view(const sstring& ks_name, const sstring& view_name) override {}
|
||||
};
|
||||
|
||||
static auth_migration_listener auth_migration;
|
||||
|
||||
static sharded<auth::permissions_cache> perm_cache;
|
||||
|
||||
static future<> start_permission_cache() {
|
||||
auto& db_config = cql3::get_local_query_processor().db().local().get_config();
|
||||
|
||||
auth::permissions_cache_config c;
|
||||
c.max_entries = db_config.permissions_cache_max_entries();
|
||||
c.validity_period = std::chrono::milliseconds(db_config.permissions_validity_in_ms());
|
||||
c.update_period = std::chrono::milliseconds(db_config.permissions_update_interval_in_ms());
|
||||
|
||||
return perm_cache.start(c, std::ref(auth::authorizer::get()), std::ref(alogger));
|
||||
}
|
||||
|
||||
static delayed_tasks<>& get_local_delayed_tasks() {
|
||||
static thread_local delayed_tasks<> instance;
|
||||
return instance;
|
||||
}
|
||||
|
||||
void auth::auth::schedule_when_up(scheduled_func f) {
|
||||
get_local_delayed_tasks().schedule_after(SUPERUSER_SETUP_DELAY, std::move(f));
|
||||
}
|
||||
|
||||
future<> auth::auth::setup() {
|
||||
auto& db = cql3::get_local_query_processor().db().local();
|
||||
auto& cfg = db.get_config();
|
||||
|
||||
qualified_name authenticator_name(meta::AUTH_PACKAGE_NAME, cfg.authenticator()),
|
||||
authorizer_name(meta::AUTH_PACKAGE_NAME, cfg.authorizer());
|
||||
|
||||
if (allow_all_authenticator_name() == authenticator_name && allow_all_authorizer_name() == authorizer_name) {
|
||||
return authenticator::setup(authenticator_name).then([authorizer_name = std::move(authorizer_name)] {
|
||||
return authorizer::setup(authorizer_name);
|
||||
}).then([] {
|
||||
return start_permission_cache();
|
||||
});
|
||||
}
|
||||
|
||||
future<> f = make_ready_future<>();
|
||||
|
||||
if (!db.has_keyspace(meta::AUTH_KS)) {
|
||||
std::map<sstring, sstring> opts;
|
||||
opts["replication_factor"] = "1";
|
||||
auto ksm = keyspace_metadata::new_keyspace(meta::AUTH_KS, "org.apache.cassandra.locator.SimpleStrategy", opts, true);
|
||||
// We use min_timestamp so that default keyspace metadata will loose with any manual adjustments. See issue #2129.
|
||||
f = service::get_local_migration_manager().announce_new_keyspace(ksm, api::min_timestamp, false);
|
||||
}
|
||||
|
||||
return f.then([] {
|
||||
return setup_table(meta::USERS_CF, sprint("CREATE TABLE %s.%s (%s text, %s boolean, PRIMARY KEY(%s)) WITH gc_grace_seconds=%d",
|
||||
meta::AUTH_KS, meta::USERS_CF, USER_NAME, SUPER, USER_NAME,
|
||||
90 * 24 * 60 * 60)); // 3 months.
|
||||
}).then([authenticator_name = std::move(authenticator_name)] {
|
||||
return authenticator::setup(authenticator_name);
|
||||
}).then([authorizer_name = std::move(authorizer_name)] {
|
||||
return authorizer::setup(authorizer_name);
|
||||
}).then([] {
|
||||
return start_permission_cache();
|
||||
}).then([] {
|
||||
service::get_local_migration_manager().register_listener(&auth_migration); // again, only one shard...
|
||||
// instead of once-timer, just schedule this later
|
||||
schedule_when_up([] {
|
||||
// setup default super user
|
||||
return has_existing_users(meta::USERS_CF, meta::DEFAULT_SUPERUSER_NAME, USER_NAME).then([](bool exists) {
|
||||
if (!exists) {
|
||||
auto query = sprint("INSERT INTO %s.%s (%s, %s) VALUES (?, ?) USING TIMESTAMP 0",
|
||||
meta::AUTH_KS, meta::USERS_CF, USER_NAME, SUPER);
|
||||
cql3::get_local_query_processor().process(query, db::consistency_level::ONE, {meta::DEFAULT_SUPERUSER_NAME, true}).then([](auto) {
|
||||
alogger.info("Created default superuser '{}'", meta::DEFAULT_SUPERUSER_NAME);
|
||||
}).handle_exception([](auto ep) {
|
||||
try {
|
||||
std::rethrow_exception(ep);
|
||||
} catch (exceptions::request_execution_exception&) {
|
||||
alogger.warn("Skipped default superuser setup: some nodes were not ready");
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
future<> auth::auth::shutdown() {
|
||||
// just make sure we don't have pending tasks.
|
||||
// this is mostly relevant for test cases where
|
||||
// db-env-shutdown != process shutdown
|
||||
return smp::invoke_on_all([] {
|
||||
get_local_delayed_tasks().cancel_all();
|
||||
}).then([] {
|
||||
return perm_cache.stop();
|
||||
}).then([] {
|
||||
return when_all_succeed(authorizer::get().stop(), authenticator::get().stop());
|
||||
});
|
||||
}
|
||||
|
||||
future<auth::permission_set> auth::auth::get_permissions(::shared_ptr<authenticated_user> user, data_resource resource) {
|
||||
return perm_cache.local().get(std::move(user), std::move(resource));
|
||||
}
|
||||
|
||||
static db::consistency_level consistency_for_user(const sstring& username) {
|
||||
if (username == auth::meta::DEFAULT_SUPERUSER_NAME) {
|
||||
return db::consistency_level::QUORUM;
|
||||
}
|
||||
return db::consistency_level::LOCAL_ONE;
|
||||
}
|
||||
|
||||
static future<::shared_ptr<cql3::untyped_result_set>> select_user(const sstring& username) {
|
||||
// Here was a thread local, explicit cache of prepared statement. In normal execution this is
|
||||
// fine, but since we in testing set up and tear down system over and over, we'd start using
|
||||
// obsolete prepared statements pretty quickly.
|
||||
// Rely on query processing caching statements instead, and lets assume
|
||||
// that a map lookup string->statement is not gonna kill us much.
|
||||
return cql3::get_local_query_processor().process(
|
||||
sprint("SELECT * FROM %s.%s WHERE %s = ?",
|
||||
auth::meta::AUTH_KS, auth::meta::USERS_CF,
|
||||
USER_NAME), consistency_for_user(username),
|
||||
{ username }, true);
|
||||
}
|
||||
|
||||
future<bool> auth::auth::is_existing_user(const sstring& username) {
|
||||
return select_user(username).then(
|
||||
[](::shared_ptr<cql3::untyped_result_set> res) {
|
||||
return make_ready_future<bool>(!res->empty());
|
||||
});
|
||||
}
|
||||
|
||||
future<bool> auth::auth::is_super_user(const sstring& username) {
|
||||
return select_user(username).then(
|
||||
[](::shared_ptr<cql3::untyped_result_set> res) {
|
||||
return make_ready_future<bool>(!res->empty() && res->one().get_as<bool>(SUPER));
|
||||
});
|
||||
}
|
||||
|
||||
future<> auth::auth::insert_user(const sstring& username, bool is_super) {
|
||||
return cql3::get_local_query_processor().process(sprint("INSERT INTO %s.%s (%s, %s) VALUES (?, ?)",
|
||||
meta::AUTH_KS, meta::USERS_CF, USER_NAME, SUPER),
|
||||
consistency_for_user(username), { username, is_super }).discard_result();
|
||||
}
|
||||
|
||||
future<> auth::auth::delete_user(const sstring& username) {
|
||||
return cql3::get_local_query_processor().process(sprint("DELETE FROM %s.%s WHERE %s = ?",
|
||||
meta::AUTH_KS, meta::USERS_CF, USER_NAME),
|
||||
consistency_for_user(username), { username }).discard_result();
|
||||
}
|
||||
|
||||
future<> auth::auth::setup_table(const sstring& name, const sstring& cql) {
|
||||
auto& qp = cql3::get_local_query_processor();
|
||||
auto& db = qp.db().local();
|
||||
|
||||
if (db.has_schema(meta::AUTH_KS, name)) {
|
||||
return make_ready_future();
|
||||
}
|
||||
|
||||
::shared_ptr<cql3::statements::raw::cf_statement> parsed = static_pointer_cast<
|
||||
cql3::statements::raw::cf_statement>(cql3::query_processor::parse_statement(cql));
|
||||
parsed->prepare_keyspace(meta::AUTH_KS);
|
||||
::shared_ptr<cql3::statements::create_table_statement> statement =
|
||||
static_pointer_cast<cql3::statements::create_table_statement>(
|
||||
parsed->prepare(db, qp.get_cql_stats())->statement);
|
||||
auto schema = statement->get_cf_meta_data();
|
||||
auto uuid = generate_legacy_id(schema->ks_name(), schema->cf_name());
|
||||
|
||||
schema_builder b(schema);
|
||||
b.set_uuid(uuid);
|
||||
return service::get_local_migration_manager().announce_new_column_family(b.build(), false);
|
||||
}
|
||||
|
||||
future<bool> auth::auth::has_existing_users(const sstring& cfname, const sstring& def_user_name, const sstring& name_column) {
|
||||
auto default_user_query = sprint("SELECT * FROM %s.%s WHERE %s = ?", meta::AUTH_KS, cfname, name_column);
|
||||
auto all_users_query = sprint("SELECT * FROM %s.%s LIMIT 1", meta::AUTH_KS, cfname);
|
||||
|
||||
return cql3::get_local_query_processor().process(default_user_query, db::consistency_level::ONE, { def_user_name }).then([=](::shared_ptr<cql3::untyped_result_set> res) {
|
||||
if (!res->empty()) {
|
||||
return make_ready_future<bool>(true);
|
||||
}
|
||||
return cql3::get_local_query_processor().process(default_user_query, db::consistency_level::QUORUM, { def_user_name }).then([all_users_query](::shared_ptr<cql3::untyped_result_set> res) {
|
||||
if (!res->empty()) {
|
||||
return make_ready_future<bool>(true);
|
||||
}
|
||||
return cql3::get_local_query_processor().process(all_users_query, db::consistency_level::QUORUM).then([](::shared_ptr<cql3::untyped_result_set> res) {
|
||||
return make_ready_future<bool>(!res->empty());
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
117
auth/auth.hh
117
auth/auth.hh
@@ -1,117 +0,0 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
/*
|
||||
* Copyright (C) 2016 ScyllaDB
|
||||
*
|
||||
* Modified by ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* This file is part of Scylla.
|
||||
*
|
||||
* Scylla is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Scylla is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <seastar/core/sstring.hh>
|
||||
#include <seastar/core/future.hh>
|
||||
#include <seastar/core/shared_ptr.hh>
|
||||
|
||||
|
||||
#include "exceptions/exceptions.hh"
|
||||
#include "permission.hh"
|
||||
#include "data_resource.hh"
|
||||
#include "authenticated_user.hh"
|
||||
|
||||
namespace auth {
|
||||
|
||||
class auth {
|
||||
public:
|
||||
class permissions_cache;
|
||||
|
||||
static future<permission_set> get_permissions(::shared_ptr<authenticated_user>, data_resource);
|
||||
|
||||
/**
|
||||
* Checks if the username is stored in AUTH_KS.USERS_CF.
|
||||
*
|
||||
* @param username Username to query.
|
||||
* @return whether or not Cassandra knows about the user.
|
||||
*/
|
||||
static future<bool> is_existing_user(const sstring& username);
|
||||
|
||||
/**
|
||||
* Checks if the user is a known superuser.
|
||||
*
|
||||
* @param username Username to query.
|
||||
* @return true is the user is a superuser, false if they aren't or don't exist at all.
|
||||
*/
|
||||
static future<bool> is_super_user(const sstring& username);
|
||||
|
||||
/**
|
||||
* Inserts the user into AUTH_KS.USERS_CF (or overwrites their superuser status as a result of an ALTER USER query).
|
||||
*
|
||||
* @param username Username to insert.
|
||||
* @param isSuper User's new status.
|
||||
* @throws RequestExecutionException
|
||||
*/
|
||||
static future<> insert_user(const sstring& username, bool is_super);
|
||||
|
||||
/**
|
||||
* Deletes the user from AUTH_KS.USERS_CF.
|
||||
*
|
||||
* @param username Username to delete.
|
||||
* @throws RequestExecutionException
|
||||
*/
|
||||
static future<> delete_user(const sstring& username);
|
||||
|
||||
/**
|
||||
* Sets up Authenticator and Authorizer.
|
||||
*/
|
||||
static future<> setup();
|
||||
static future<> shutdown();
|
||||
|
||||
/**
|
||||
* Set up table from given CREATE TABLE statement under system_auth keyspace, if not already done so.
|
||||
*
|
||||
* @param name name of the table
|
||||
* @param cql CREATE TABLE statement
|
||||
*/
|
||||
static future<> setup_table(const sstring& name, const sstring& cql);
|
||||
|
||||
static future<bool> has_existing_users(const sstring& cfname, const sstring& def_user_name, const sstring& name_column_name);
|
||||
|
||||
// For internal use. Run function "when system is up".
|
||||
typedef std::function<future<>()> scheduled_func;
|
||||
static void schedule_when_up(scheduled_func);
|
||||
};
|
||||
}
|
||||
|
||||
std::ostream& operator<<(std::ostream& os, const std::pair<auth::authenticated_user, auth::data_resource>& p);
|
||||
@@ -41,7 +41,6 @@
|
||||
|
||||
|
||||
#include "authenticated_user.hh"
|
||||
#include "auth.hh"
|
||||
|
||||
const sstring auth::authenticated_user::ANONYMOUS_USERNAME("anonymous");
|
||||
|
||||
@@ -60,13 +59,6 @@ const sstring& auth::authenticated_user::name() const {
|
||||
return _anon ? ANONYMOUS_USERNAME : _name;
|
||||
}
|
||||
|
||||
future<bool> auth::authenticated_user::is_super() const {
|
||||
if (is_anonymous()) {
|
||||
return make_ready_future<bool>(false);
|
||||
}
|
||||
return auth::auth::is_super_user(_name);
|
||||
}
|
||||
|
||||
bool auth::authenticated_user::operator==(const authenticated_user& v) const {
|
||||
return _anon ? v._anon : _name == v._name;
|
||||
}
|
||||
|
||||
@@ -58,14 +58,6 @@ public:
|
||||
|
||||
const sstring& name() const;
|
||||
|
||||
/**
|
||||
* Checks the user's superuser status.
|
||||
* Only a superuser is allowed to perform CREATE USER and DROP USER queries.
|
||||
* Im most cased, though not necessarily, a superuser will have Permission.ALL on every resource
|
||||
* (depends on IAuthorizer implementation).
|
||||
*/
|
||||
future<bool> is_super() const;
|
||||
|
||||
/**
|
||||
* If IAuthenticator doesn't require authentication, this method may return true.
|
||||
*/
|
||||
|
||||
@@ -43,7 +43,6 @@
|
||||
#include "authenticated_user.hh"
|
||||
#include "common.hh"
|
||||
#include "password_authenticator.hh"
|
||||
#include "auth.hh"
|
||||
#include "cql3/query_processor.hh"
|
||||
#include "db/config.hh"
|
||||
#include "utils/class_registrator.hh"
|
||||
@@ -51,11 +50,6 @@
|
||||
const sstring auth::authenticator::USERNAME_KEY("username");
|
||||
const sstring auth::authenticator::PASSWORD_KEY("password");
|
||||
|
||||
const sstring& auth::allow_all_authenticator_name() {
|
||||
static const sstring name = meta::AUTH_PACKAGE_NAME + "AllowAllAuthenticator";
|
||||
return name;
|
||||
}
|
||||
|
||||
auth::authenticator::option auth::authenticator::string_to_option(const sstring& name) {
|
||||
if (strcasecmp(name.c_str(), "password") == 0) {
|
||||
return option::PASSWORD;
|
||||
@@ -71,70 +65,3 @@ sstring auth::authenticator::option_to_string(option opt) {
|
||||
throw std::invalid_argument(sprint("Unknown option {}", opt));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Authenticator is assumed to be a fully state-less immutable object (note all the const).
|
||||
* We thus store a single instance globally, since it should be safe/ok.
|
||||
*/
|
||||
static std::unique_ptr<auth::authenticator> global_authenticator;
|
||||
|
||||
using authenticator_registry = class_registry<auth::authenticator, cql3::query_processor&>;
|
||||
|
||||
future<>
|
||||
auth::authenticator::setup(const sstring& type) {
|
||||
if (type == allow_all_authenticator_name()) {
|
||||
class allow_all_authenticator : public authenticator {
|
||||
public:
|
||||
future<> start() override {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
future<> stop() override {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
const sstring& qualified_java_name() const override {
|
||||
return allow_all_authenticator_name();
|
||||
}
|
||||
bool require_authentication() const override {
|
||||
return false;
|
||||
}
|
||||
option_set supported_options() const override {
|
||||
return option_set();
|
||||
}
|
||||
option_set alterable_options() const override {
|
||||
return option_set();
|
||||
}
|
||||
future<::shared_ptr<authenticated_user>> authenticate(const credentials_map& credentials) const override {
|
||||
return make_ready_future<::shared_ptr<authenticated_user>>(::make_shared<authenticated_user>());
|
||||
}
|
||||
future<> create(sstring username, const option_map& options) override {
|
||||
return make_ready_future();
|
||||
}
|
||||
future<> alter(sstring username, const option_map& options) override {
|
||||
return make_ready_future();
|
||||
}
|
||||
future<> drop(sstring username) override {
|
||||
return make_ready_future();
|
||||
}
|
||||
const resource_ids& protected_resources() const override {
|
||||
static const resource_ids ids;
|
||||
return ids;
|
||||
}
|
||||
::shared_ptr<sasl_challenge> new_sasl_challenge() const override {
|
||||
throw std::runtime_error("Should not reach");
|
||||
}
|
||||
};
|
||||
global_authenticator = std::make_unique<allow_all_authenticator>();
|
||||
return make_ready_future();
|
||||
} else {
|
||||
auto a = authenticator_registry::create(type, cql3::get_local_query_processor());
|
||||
auto f = a->start();
|
||||
return f.then([a = std::move(a)]() mutable {
|
||||
global_authenticator = std::move(a);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
auth::authenticator& auth::authenticator::get() {
|
||||
assert(global_authenticator);
|
||||
return *global_authenticator;
|
||||
}
|
||||
|
||||
@@ -65,8 +65,6 @@ namespace auth {
|
||||
|
||||
class authenticated_user;
|
||||
|
||||
const sstring& allow_all_authenticator_name();
|
||||
|
||||
class authenticator {
|
||||
public:
|
||||
static const sstring USERNAME_KEY;
|
||||
@@ -87,19 +85,6 @@ public:
|
||||
using option_map = std::unordered_map<option, boost::any, enum_hash<option>>;
|
||||
using credentials_map = std::unordered_map<sstring, sstring>;
|
||||
|
||||
/**
|
||||
* Setup is called once upon system startup to initialize the IAuthenticator.
|
||||
*
|
||||
* For example, use this method to create any required keyspaces/column families.
|
||||
* Note: Only call from main thread.
|
||||
*/
|
||||
static future<> setup(const sstring& type);
|
||||
|
||||
/**
|
||||
* Returns the system authenticator. Must have called setup before calling this.
|
||||
*/
|
||||
static authenticator& get();
|
||||
|
||||
virtual ~authenticator()
|
||||
{}
|
||||
|
||||
|
||||
@@ -55,6 +55,8 @@
|
||||
|
||||
namespace auth {
|
||||
|
||||
class service;
|
||||
|
||||
class authenticated_user;
|
||||
|
||||
struct permission_details {
|
||||
@@ -69,8 +71,6 @@ struct permission_details {
|
||||
|
||||
using std::experimental::optional;
|
||||
|
||||
const sstring& allow_all_authorizer_name();
|
||||
|
||||
class authorizer {
|
||||
public:
|
||||
virtual ~authorizer() {}
|
||||
@@ -88,7 +88,7 @@ public:
|
||||
* @param resource Resource for which the authorization is being requested. @see DataResource.
|
||||
* @return Set of permissions of the user on the resource. Should never return empty. Use permission.NONE instead.
|
||||
*/
|
||||
virtual future<permission_set> authorize(::shared_ptr<authenticated_user>, data_resource) const = 0;
|
||||
virtual future<permission_set> authorize(service&, ::shared_ptr<authenticated_user>, data_resource) const = 0;
|
||||
|
||||
/**
|
||||
* Grants a set of permissions on a resource to a user.
|
||||
@@ -132,7 +132,7 @@ public:
|
||||
* @throws RequestValidationException
|
||||
* @throws RequestExecutionException
|
||||
*/
|
||||
virtual future<std::vector<permission_details>> list(::shared_ptr<authenticated_user> performer, permission_set, optional<data_resource>, optional<sstring>) const = 0;
|
||||
virtual future<std::vector<permission_details>> list(service&, ::shared_ptr<authenticated_user> performer, permission_set, optional<data_resource>, optional<sstring>) const = 0;
|
||||
|
||||
/**
|
||||
* This method is called before deleting a user with DROP USER query so that a new user with the same
|
||||
@@ -162,18 +162,6 @@ public:
|
||||
* @throws ConfigurationException when there is a configuration error.
|
||||
*/
|
||||
virtual future<> validate_configuration() const = 0;
|
||||
|
||||
/**
|
||||
* Setup is called once upon system startup to initialize the IAuthorizer.
|
||||
*
|
||||
* For example, use this method to create any required keyspaces/column families.
|
||||
*/
|
||||
static future<> setup(const sstring& type);
|
||||
|
||||
/**
|
||||
* Returns the system authorizer. Must have called setup before calling this.
|
||||
*/
|
||||
static authorizer& get();
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@@ -21,6 +21,13 @@
|
||||
|
||||
#include "auth/common.hh"
|
||||
|
||||
#include <seastar/core/shared_ptr.hh>
|
||||
|
||||
#include "cql3/query_processor.hh"
|
||||
#include "cql3/statements/create_table_statement.hh"
|
||||
#include "schema_builder.hh"
|
||||
#include "service/migration_manager.hh"
|
||||
|
||||
namespace auth {
|
||||
|
||||
namespace meta {
|
||||
@@ -32,4 +39,32 @@ const sstring AUTH_PACKAGE_NAME("org.apache.cassandra.auth.");
|
||||
|
||||
}
|
||||
|
||||
future<> create_metadata_table_if_missing(
|
||||
const sstring& table_name,
|
||||
cql3::query_processor& qp,
|
||||
const sstring& cql,
|
||||
::service::migration_manager& mm) {
|
||||
auto& db = qp.db().local();
|
||||
|
||||
if (db.has_schema(meta::AUTH_KS, table_name)) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
auto parsed_statement = static_pointer_cast<cql3::statements::raw::cf_statement>(
|
||||
cql3::query_processor::parse_statement(cql));
|
||||
|
||||
parsed_statement->prepare_keyspace(meta::AUTH_KS);
|
||||
|
||||
auto statement = static_pointer_cast<cql3::statements::create_table_statement>(
|
||||
parsed_statement->prepare(db, qp.get_cql_stats())->statement);
|
||||
|
||||
const auto schema = statement->get_cf_meta_data();
|
||||
const auto uuid = generate_legacy_id(schema->ks_name(), schema->cf_name());
|
||||
|
||||
schema_builder b(schema);
|
||||
b.set_uuid(uuid);
|
||||
|
||||
return mm.announce_new_column_family(b.build(), false);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -21,10 +21,24 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <chrono>
|
||||
|
||||
#include <seastar/core/future.hh>
|
||||
#include <seastar/core/reactor.hh>
|
||||
#include <seastar/core/resource.hh>
|
||||
#include <seastar/core/sstring.hh>
|
||||
|
||||
#include "delayed_tasks.hh"
|
||||
#include "seastarx.hh"
|
||||
|
||||
namespace service {
|
||||
class migration_manager;
|
||||
}
|
||||
|
||||
namespace cql3 {
|
||||
class query_processor;
|
||||
}
|
||||
|
||||
namespace auth {
|
||||
|
||||
namespace meta {
|
||||
@@ -36,4 +50,25 @@ extern const sstring AUTH_PACKAGE_NAME;
|
||||
|
||||
}
|
||||
|
||||
template <class Task>
|
||||
future<> once_among_shards(Task&& f) {
|
||||
if (engine().cpu_id() == 0u) {
|
||||
return f();
|
||||
}
|
||||
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
template <class Task, class Clock>
|
||||
void delay_until_system_ready(delayed_tasks<Clock>& ts, Task&& f) {
|
||||
static const typename std::chrono::milliseconds delay_duration(10000);
|
||||
ts.schedule_after(delay_duration, std::forward<Task>(f));
|
||||
}
|
||||
|
||||
future<> create_metadata_table_if_missing(
|
||||
const sstring& table_name,
|
||||
cql3::query_processor&,
|
||||
const sstring& cql,
|
||||
::service::migration_manager&);
|
||||
|
||||
}
|
||||
|
||||
@@ -46,7 +46,6 @@
|
||||
|
||||
#include <seastar/core/reactor.hh>
|
||||
|
||||
#include "auth.hh"
|
||||
#include "common.hh"
|
||||
#include "default_authorizer.hh"
|
||||
#include "authenticated_user.hh"
|
||||
@@ -69,18 +68,22 @@ static const sstring PERMISSIONS_CF = "permissions";
|
||||
static logging::logger alogger("default_authorizer");
|
||||
|
||||
// To ensure correct initialization order, we unfortunately need to use a string literal.
|
||||
static const class_registrator<auth::authorizer, auth::default_authorizer, cql3::query_processor&> password_auth_reg(
|
||||
"org.apache.cassandra.auth.CassandraAuthorizer");
|
||||
static const class_registrator<
|
||||
auth::authorizer,
|
||||
auth::default_authorizer,
|
||||
cql3::query_processor&,
|
||||
::service::migration_manager&> password_auth_reg("org.apache.cassandra.auth.CassandraAuthorizer");
|
||||
|
||||
auth::default_authorizer::default_authorizer(cql3::query_processor& qp)
|
||||
: _qp(qp) {
|
||||
auth::default_authorizer::default_authorizer(cql3::query_processor& qp, ::service::migration_manager& mm)
|
||||
: _qp(qp)
|
||||
, _migration_manager(mm) {
|
||||
}
|
||||
|
||||
auth::default_authorizer::~default_authorizer() {
|
||||
}
|
||||
|
||||
future<> auth::default_authorizer::start() {
|
||||
sstring create_table = sprint("CREATE TABLE %s.%s ("
|
||||
static const sstring create_table = sprint("CREATE TABLE %s.%s ("
|
||||
"%s text,"
|
||||
"%s text,"
|
||||
"%s set<text>,"
|
||||
@@ -89,7 +92,13 @@ future<> auth::default_authorizer::start() {
|
||||
PERMISSIONS_CF, USER_NAME, RESOURCE_NAME, PERMISSIONS_NAME,
|
||||
USER_NAME, RESOURCE_NAME, 90 * 24 * 60 * 60); // 3 months.
|
||||
|
||||
return auth::setup_table(PERMISSIONS_CF, create_table);
|
||||
return auth::once_among_shards([this] {
|
||||
return auth::create_metadata_table_if_missing(
|
||||
PERMISSIONS_CF,
|
||||
_qp,
|
||||
create_table,
|
||||
_migration_manager);
|
||||
});
|
||||
}
|
||||
|
||||
future<> auth::default_authorizer::stop() {
|
||||
@@ -97,8 +106,8 @@ future<> auth::default_authorizer::stop() {
|
||||
}
|
||||
|
||||
future<auth::permission_set> auth::default_authorizer::authorize(
|
||||
::shared_ptr<authenticated_user> user, data_resource resource) const {
|
||||
return user->is_super().then([this, user, resource = std::move(resource)](bool is_super) {
|
||||
service& ser, ::shared_ptr<authenticated_user> user, data_resource resource) const {
|
||||
return auth::is_super_user(ser, *user).then([this, user, resource = std::move(resource)](bool is_super) {
|
||||
if (is_super) {
|
||||
return make_ready_future<permission_set>(permissions::ALL);
|
||||
}
|
||||
@@ -153,9 +162,9 @@ future<> auth::default_authorizer::revoke(
|
||||
}
|
||||
|
||||
future<std::vector<auth::permission_details>> auth::default_authorizer::list(
|
||||
::shared_ptr<authenticated_user> performer, permission_set set,
|
||||
service& ser, ::shared_ptr<authenticated_user> performer, permission_set set,
|
||||
optional<data_resource> resource, optional<sstring> user) const {
|
||||
return performer->is_super().then([this, performer, set = std::move(set), resource = std::move(resource), user = std::move(user)](bool is_super) {
|
||||
return auth::is_super_user(ser, *performer).then([this, performer, set = std::move(set), resource = std::move(resource), user = std::move(user)](bool is_super) {
|
||||
if (!is_super && (!user || performer->name() != *user)) {
|
||||
throw exceptions::unauthorized_exception(sprint("You are not authorized to view %s's permissions", user ? *user : "everyone"));
|
||||
}
|
||||
|
||||
@@ -41,8 +41,11 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <functional>
|
||||
|
||||
#include "authorizer.hh"
|
||||
#include "cql3/query_processor.hh"
|
||||
#include "service/migration_manager.hh"
|
||||
|
||||
namespace auth {
|
||||
|
||||
@@ -51,8 +54,10 @@ const sstring& default_authorizer_name();
|
||||
class default_authorizer : public authorizer {
|
||||
cql3::query_processor& _qp;
|
||||
|
||||
::service::migration_manager& _migration_manager;
|
||||
|
||||
public:
|
||||
default_authorizer(cql3::query_processor&);
|
||||
default_authorizer(cql3::query_processor&, ::service::migration_manager&);
|
||||
~default_authorizer();
|
||||
|
||||
future<> start() override;
|
||||
@@ -63,13 +68,13 @@ public:
|
||||
return default_authorizer_name();
|
||||
}
|
||||
|
||||
future<permission_set> authorize(::shared_ptr<authenticated_user>, data_resource) const override;
|
||||
future<permission_set> authorize(service&, ::shared_ptr<authenticated_user>, data_resource) const override;
|
||||
|
||||
future<> grant(::shared_ptr<authenticated_user>, permission_set, data_resource, sstring) override;
|
||||
|
||||
future<> revoke(::shared_ptr<authenticated_user>, permission_set, data_resource, sstring) override;
|
||||
|
||||
future<std::vector<permission_details>> list(::shared_ptr<authenticated_user>, permission_set, optional<data_resource>, optional<sstring>) const override;
|
||||
future<std::vector<permission_details>> list(service&, ::shared_ptr<authenticated_user>, permission_set, optional<data_resource>, optional<sstring>) const override;
|
||||
|
||||
future<> revoke_all(sstring) override;
|
||||
|
||||
|
||||
@@ -46,12 +46,12 @@
|
||||
|
||||
#include <seastar/core/reactor.hh>
|
||||
|
||||
#include "auth.hh"
|
||||
#include "common.hh"
|
||||
#include "password_authenticator.hh"
|
||||
#include "authenticated_user.hh"
|
||||
#include "cql3/untyped_result_set.hh"
|
||||
#include "log.hh"
|
||||
#include "service/migration_manager.hh"
|
||||
#include "utils/class_registrator.hh"
|
||||
|
||||
const sstring& auth::password_authenticator_name() {
|
||||
@@ -69,14 +69,18 @@ static const sstring CREDENTIALS_CF = "credentials";
|
||||
static logging::logger plogger("password_authenticator");
|
||||
|
||||
// To ensure correct initialization order, we unfortunately need to use a string literal.
|
||||
static const class_registrator<auth::authenticator, auth::password_authenticator, cql3::query_processor&> password_auth_reg(
|
||||
"org.apache.cassandra.auth.PasswordAuthenticator");
|
||||
static const class_registrator<
|
||||
auth::authenticator,
|
||||
auth::password_authenticator,
|
||||
cql3::query_processor&,
|
||||
::service::migration_manager&> password_auth_reg("org.apache.cassandra.auth.PasswordAuthenticator");
|
||||
|
||||
auth::password_authenticator::~password_authenticator()
|
||||
{}
|
||||
|
||||
auth::password_authenticator::password_authenticator(cql3::query_processor& qp)
|
||||
: _qp(qp) {
|
||||
auth::password_authenticator::password_authenticator(cql3::query_processor& qp, ::service::migration_manager& mm)
|
||||
: _qp(qp)
|
||||
, _migration_manager(mm) {
|
||||
}
|
||||
|
||||
// TODO: blowfish
|
||||
@@ -150,33 +154,42 @@ static sstring hashpw(const sstring& pass) {
|
||||
}
|
||||
|
||||
future<> auth::password_authenticator::start() {
|
||||
gensalt(); // do this once to determine usable hashing
|
||||
return auth::once_among_shards([this] {
|
||||
gensalt(); // do this once to determine usable hashing
|
||||
|
||||
sstring create_table = sprint(
|
||||
"CREATE TABLE %s.%s ("
|
||||
"%s text,"
|
||||
"%s text," // salt + hash + number of rounds
|
||||
"options map<text,text>,"// for future extensions
|
||||
"PRIMARY KEY(%s)"
|
||||
") WITH gc_grace_seconds=%d",
|
||||
meta::AUTH_KS,
|
||||
CREDENTIALS_CF, USER_NAME, SALTED_HASH, USER_NAME,
|
||||
90 * 24 * 60 * 60); // 3 months.
|
||||
static const sstring create_table = sprint(
|
||||
"CREATE TABLE %s.%s ("
|
||||
"%s text,"
|
||||
"%s text," // salt + hash + number of rounds
|
||||
"options map<text,text>,"// for future extensions
|
||||
"PRIMARY KEY(%s)"
|
||||
") WITH gc_grace_seconds=%d",
|
||||
meta::AUTH_KS,
|
||||
CREDENTIALS_CF, USER_NAME, SALTED_HASH, USER_NAME,
|
||||
90 * 24 * 60 * 60); // 3 months.
|
||||
|
||||
return auth::setup_table(CREDENTIALS_CF, create_table).then([this] {
|
||||
// instead of once-timer, just schedule this later
|
||||
auth::schedule_when_up([this] {
|
||||
return auth::has_existing_users(CREDENTIALS_CF, DEFAULT_USER_NAME, USER_NAME).then([this](bool exists) {
|
||||
if (!exists) {
|
||||
_qp.process(sprint("INSERT INTO %s.%s (%s, %s) VALUES (?, ?) USING TIMESTAMP 0",
|
||||
meta::AUTH_KS,
|
||||
CREDENTIALS_CF,
|
||||
USER_NAME, SALTED_HASH
|
||||
),
|
||||
db::consistency_level::ONE, {DEFAULT_USER_NAME, hashpw(DEFAULT_USER_PASSWORD)}).then([](auto) {
|
||||
plogger.info("Created default user '{}'", DEFAULT_USER_NAME);
|
||||
});
|
||||
}
|
||||
return auth::create_metadata_table_if_missing(
|
||||
CREDENTIALS_CF,
|
||||
_qp,
|
||||
create_table,
|
||||
_migration_manager).then([this] {
|
||||
auth::delay_until_system_ready(_delayed, [this] {
|
||||
return has_existing_users().then([this](bool existing) {
|
||||
if (!existing) {
|
||||
return _qp.process(
|
||||
sprint(
|
||||
"INSERT INTO %s.%s (%s, %s) VALUES (?, ?) USING TIMESTAMP 0",
|
||||
meta::AUTH_KS,
|
||||
CREDENTIALS_CF,
|
||||
USER_NAME, SALTED_HASH),
|
||||
db::consistency_level::ONE,
|
||||
{ DEFAULT_USER_NAME, hashpw(DEFAULT_USER_PASSWORD) }).then([](auto) {
|
||||
plogger.info("Created default user '{}'", DEFAULT_USER_NAME);
|
||||
});
|
||||
}
|
||||
|
||||
return make_ready_future<>();
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
@@ -288,8 +301,10 @@ const auth::resource_ids& auth::password_authenticator::protected_resources() co
|
||||
|
||||
::shared_ptr<auth::authenticator::sasl_challenge> auth::password_authenticator::new_sasl_challenge() const {
|
||||
class plain_text_password_challenge: public sasl_challenge {
|
||||
const password_authenticator& _self;
|
||||
|
||||
public:
|
||||
plain_text_password_challenge()
|
||||
plain_text_password_challenge(const password_authenticator& self) : _self(self)
|
||||
{}
|
||||
|
||||
/**
|
||||
@@ -344,11 +359,58 @@ const auth::resource_ids& auth::password_authenticator::protected_resources() co
|
||||
return _complete;
|
||||
}
|
||||
future<::shared_ptr<authenticated_user>> get_authenticated_user() const override {
|
||||
return authenticator::get().authenticate(_credentials);
|
||||
return _self.authenticate(_credentials);
|
||||
}
|
||||
private:
|
||||
credentials_map _credentials;
|
||||
bool _complete = false;
|
||||
};
|
||||
return ::make_shared<plain_text_password_challenge>();
|
||||
return ::make_shared<plain_text_password_challenge>(*this);
|
||||
}
|
||||
|
||||
|
||||
//
|
||||
// Similar in structure to `auth::service::has_existing_users()`, but trying to generalize the pattern breaks all kinds
|
||||
// of module boundaries and leaks implementation details.
|
||||
//
|
||||
future<bool> auth::password_authenticator::has_existing_users() const {
|
||||
static const sstring default_user_query = sprint(
|
||||
"SELECT * FROM %s.%s WHERE %s = ?",
|
||||
meta::AUTH_KS,
|
||||
CREDENTIALS_CF,
|
||||
USER_NAME);
|
||||
|
||||
static const sstring all_users_query = sprint(
|
||||
"SELECT * FROM %s.%s LIMIT 1",
|
||||
meta::AUTH_KS,
|
||||
CREDENTIALS_CF);
|
||||
|
||||
// This logic is borrowed directly from Apache Cassandra. By first checking for the presence of the default user, we
|
||||
// can potentially avoid doing a range query with a high consistency level.
|
||||
|
||||
return _qp.process(
|
||||
default_user_query,
|
||||
db::consistency_level::ONE,
|
||||
{ meta::DEFAULT_SUPERUSER_NAME },
|
||||
true).then([this](auto results) {
|
||||
if (!results->empty()) {
|
||||
return make_ready_future<bool>(true);
|
||||
}
|
||||
|
||||
return _qp.process(
|
||||
default_user_query,
|
||||
db::consistency_level::QUORUM,
|
||||
{ meta::DEFAULT_SUPERUSER_NAME },
|
||||
true).then([this](auto results) {
|
||||
if (!results->empty()) {
|
||||
return make_ready_future<bool>(true);
|
||||
}
|
||||
|
||||
return _qp.process(
|
||||
all_users_query,
|
||||
db::consistency_level::QUORUM).then([](auto results) {
|
||||
return make_ready_future<bool>(!results->empty());
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
@@ -43,6 +43,11 @@
|
||||
|
||||
#include "authenticator.hh"
|
||||
#include "cql3/query_processor.hh"
|
||||
#include "delayed_tasks.hh"
|
||||
|
||||
namespace service {
|
||||
class migration_manager;
|
||||
}
|
||||
|
||||
namespace auth {
|
||||
|
||||
@@ -51,8 +56,12 @@ const sstring& password_authenticator_name();
|
||||
class password_authenticator : public authenticator {
|
||||
cql3::query_processor& _qp;
|
||||
|
||||
::service::migration_manager& _migration_manager;
|
||||
|
||||
delayed_tasks<> _delayed{};
|
||||
|
||||
public:
|
||||
password_authenticator(cql3::query_processor&);
|
||||
password_authenticator(cql3::query_processor&, ::service::migration_manager&);
|
||||
~password_authenticator();
|
||||
|
||||
future<> start() override;
|
||||
@@ -72,6 +81,9 @@ public:
|
||||
|
||||
|
||||
static db::consistency_level consistency_for_user(const sstring& username);
|
||||
|
||||
private:
|
||||
future<bool> has_existing_users() const;
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@@ -21,12 +21,26 @@
|
||||
|
||||
#include "auth/permissions_cache.hh"
|
||||
|
||||
#include "auth/authorizer.hh"
|
||||
#include "auth/common.hh"
|
||||
#include "auth/service.hh"
|
||||
#include "db/config.hh"
|
||||
|
||||
namespace auth {
|
||||
|
||||
permissions_cache::permissions_cache(const permissions_cache_config& c, authorizer& a, logging::logger& log)
|
||||
: _cache(c.max_entries, c.validity_period, c.update_period, log, [&a, &log](const key_type& k) {
|
||||
permissions_cache_config permissions_cache_config::from_db_config(const db::config& dc) {
|
||||
permissions_cache_config c;
|
||||
c.max_entries = dc.permissions_cache_max_entries();
|
||||
c.validity_period = std::chrono::milliseconds(dc.permissions_validity_in_ms());
|
||||
c.update_period = std::chrono::milliseconds(dc.permissions_update_interval_in_ms());
|
||||
|
||||
return c;
|
||||
}
|
||||
|
||||
permissions_cache::permissions_cache(const permissions_cache_config& c, service& ser, logging::logger& log)
|
||||
: _cache(c.max_entries, c.validity_period, c.update_period, log, [&ser, &log](const key_type& k) {
|
||||
log.debug("Refreshing permissions for {}", k.first.name());
|
||||
return a.authorize(::make_shared<authenticated_user>(k.first), k.second);
|
||||
return ser.underlying_authorizer().authorize(ser, ::make_shared<authenticated_user>(k.first), k.second);
|
||||
}) {
|
||||
}
|
||||
|
||||
|
||||
@@ -30,7 +30,6 @@
|
||||
#include <seastar/core/shared_ptr.hh>
|
||||
|
||||
#include "auth/authenticated_user.hh"
|
||||
#include "auth/authorizer.hh"
|
||||
#include "auth/data_resource.hh"
|
||||
#include "auth/permission.hh"
|
||||
#include "log.hh"
|
||||
@@ -59,9 +58,17 @@ inline std::ostream& operator<<(std::ostream& os, const std::pair<auth::authenti
|
||||
|
||||
}
|
||||
|
||||
namespace db {
|
||||
class config;
|
||||
}
|
||||
|
||||
namespace auth {
|
||||
|
||||
class service;
|
||||
|
||||
struct permissions_cache_config final {
|
||||
static permissions_cache_config from_db_config(const db::config&);
|
||||
|
||||
std::size_t max_entries;
|
||||
std::chrono::milliseconds validity_period;
|
||||
std::chrono::milliseconds update_period;
|
||||
@@ -80,7 +87,7 @@ class permissions_cache final {
|
||||
cache_type _cache;
|
||||
|
||||
public:
|
||||
explicit permissions_cache(const permissions_cache_config&, authorizer&, logging::logger&);
|
||||
explicit permissions_cache(const permissions_cache_config&, service&, logging::logger&);
|
||||
|
||||
future<> start() {
|
||||
return make_ready_future<>();
|
||||
|
||||
353
auth/service.cc
Normal file
353
auth/service.cc
Normal file
@@ -0,0 +1,353 @@
|
||||
/*
|
||||
* Copyright (C) 2017 ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* This file is part of Scylla.
|
||||
*
|
||||
* Scylla is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Scylla is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "auth/service.hh"
|
||||
|
||||
#include <map>
|
||||
|
||||
#include <seastar/core/future-util.hh>
|
||||
#include <seastar/core/sharded.hh>
|
||||
#include <seastar/core/shared_ptr.hh>
|
||||
|
||||
#include "auth/allow_all_authenticator.hh"
|
||||
#include "auth/allow_all_authorizer.hh"
|
||||
#include "auth/common.hh"
|
||||
#include "cql3/query_processor.hh"
|
||||
#include "cql3/untyped_result_set.hh"
|
||||
#include "db/config.hh"
|
||||
#include "db/consistency_level.hh"
|
||||
#include "exceptions/exceptions.hh"
|
||||
#include "log.hh"
|
||||
#include "service/migration_listener.hh"
|
||||
#include "utils/class_registrator.hh"
|
||||
|
||||
namespace auth {
|
||||
|
||||
namespace meta {
|
||||
|
||||
static const sstring user_name_col_name("name");
|
||||
static const sstring superuser_col_name("super");
|
||||
|
||||
}
|
||||
|
||||
static logging::logger log("auth_service");
|
||||
|
||||
class auth_migration_listener final : public ::service::migration_listener {
|
||||
authorizer& _authorizer;
|
||||
|
||||
public:
|
||||
explicit auth_migration_listener(authorizer& a) : _authorizer(a) {
|
||||
}
|
||||
|
||||
private:
|
||||
void on_create_keyspace(const sstring& ks_name) override {}
|
||||
void on_create_column_family(const sstring& ks_name, const sstring& cf_name) override {}
|
||||
void on_create_user_type(const sstring& ks_name, const sstring& type_name) override {}
|
||||
void on_create_function(const sstring& ks_name, const sstring& function_name) override {}
|
||||
void on_create_aggregate(const sstring& ks_name, const sstring& aggregate_name) override {}
|
||||
void on_create_view(const sstring& ks_name, const sstring& view_name) override {}
|
||||
|
||||
void on_update_keyspace(const sstring& ks_name) override {}
|
||||
void on_update_column_family(const sstring& ks_name, const sstring& cf_name, bool) override {}
|
||||
void on_update_user_type(const sstring& ks_name, const sstring& type_name) override {}
|
||||
void on_update_function(const sstring& ks_name, const sstring& function_name) override {}
|
||||
void on_update_aggregate(const sstring& ks_name, const sstring& aggregate_name) override {}
|
||||
void on_update_view(const sstring& ks_name, const sstring& view_name, bool columns_changed) override {}
|
||||
|
||||
void on_drop_keyspace(const sstring& ks_name) override {
|
||||
_authorizer.revoke_all(auth::data_resource(ks_name));
|
||||
}
|
||||
|
||||
void on_drop_column_family(const sstring& ks_name, const sstring& cf_name) override {
|
||||
_authorizer.revoke_all(auth::data_resource(ks_name, cf_name));
|
||||
}
|
||||
|
||||
void on_drop_user_type(const sstring& ks_name, const sstring& type_name) override {}
|
||||
void on_drop_function(const sstring& ks_name, const sstring& function_name) override {}
|
||||
void on_drop_aggregate(const sstring& ks_name, const sstring& aggregate_name) override {}
|
||||
void on_drop_view(const sstring& ks_name, const sstring& view_name) override {}
|
||||
};
|
||||
|
||||
static sharded<permissions_cache> sharded_permissions_cache{};
|
||||
|
||||
static db::consistency_level consistency_for_user(const sstring& name) {
|
||||
if (name == meta::DEFAULT_SUPERUSER_NAME) {
|
||||
return db::consistency_level::QUORUM;
|
||||
} else {
|
||||
return db::consistency_level::LOCAL_ONE;
|
||||
}
|
||||
}
|
||||
|
||||
static future<::shared_ptr<cql3::untyped_result_set>> select_user(cql3::query_processor& qp, const sstring& name) {
|
||||
// Here was a thread local, explicit cache of prepared statement. In normal execution this is
|
||||
// fine, but since we in testing set up and tear down system over and over, we'd start using
|
||||
// obsolete prepared statements pretty quickly.
|
||||
// Rely on query processing caching statements instead, and lets assume
|
||||
// that a map lookup string->statement is not gonna kill us much.
|
||||
return qp.process(
|
||||
sprint(
|
||||
"SELECT * FROM %s.%s WHERE %s = ?",
|
||||
meta::AUTH_KS,
|
||||
meta::USERS_CF,
|
||||
meta::user_name_col_name),
|
||||
consistency_for_user(name),
|
||||
{ name },
|
||||
true);
|
||||
}
|
||||
|
||||
service_config service_config::from_db_config(const db::config& dc) {
|
||||
const qualified_name qualified_authorizer_name(meta::AUTH_PACKAGE_NAME, dc.authorizer());
|
||||
const qualified_name qualified_authenticator_name(meta::AUTH_PACKAGE_NAME, dc.authenticator());
|
||||
|
||||
service_config c;
|
||||
c.authorizer_java_name = qualified_authorizer_name;
|
||||
c.authenticator_java_name = qualified_authenticator_name;
|
||||
|
||||
return c;
|
||||
}
|
||||
|
||||
service::service(
|
||||
permissions_cache_config c,
|
||||
cql3::query_processor& qp,
|
||||
::service::migration_manager& mm,
|
||||
std::unique_ptr<authorizer> a,
|
||||
std::unique_ptr<authenticator> b)
|
||||
: _cache_config(std::move(c))
|
||||
, _qp(qp)
|
||||
, _migration_manager(mm)
|
||||
, _authorizer(std::move(a))
|
||||
, _authenticator(std::move(b))
|
||||
, _migration_listener(std::make_unique<auth_migration_listener>(*_authorizer)) {
|
||||
}
|
||||
|
||||
service::service(
|
||||
permissions_cache_config cache_config,
|
||||
cql3::query_processor& qp,
|
||||
::service::migration_manager& mm,
|
||||
const service_config& sc)
|
||||
: service(
|
||||
std::move(cache_config),
|
||||
qp,
|
||||
mm,
|
||||
create_object<authorizer>(sc.authorizer_java_name, qp, mm),
|
||||
create_object<authenticator>(sc.authenticator_java_name, qp, mm)) {
|
||||
}
|
||||
|
||||
bool service::should_create_metadata() const {
|
||||
const bool null_authorizer = _authorizer->qualified_java_name() == allow_all_authorizer_name();
|
||||
const bool null_authenticator = _authenticator->qualified_java_name() == allow_all_authenticator_name();
|
||||
return !null_authorizer || !null_authenticator;
|
||||
}
|
||||
|
||||
future<> service::create_metadata_if_missing() {
|
||||
auto& db = _qp.db().local();
|
||||
|
||||
auto f = make_ready_future<>();
|
||||
|
||||
if (!db.has_keyspace(meta::AUTH_KS)) {
|
||||
std::map<sstring, sstring> opts{{"replication_factor", "1"}};
|
||||
|
||||
auto ksm = keyspace_metadata::new_keyspace(
|
||||
meta::AUTH_KS,
|
||||
"org.apache.cassandra.locator.SimpleStrategy",
|
||||
opts,
|
||||
true);
|
||||
|
||||
// We use min_timestamp so that default keyspace metadata will loose with any manual adjustments.
|
||||
// See issue #2129.
|
||||
f = _migration_manager.announce_new_keyspace(ksm, api::min_timestamp, false);
|
||||
}
|
||||
|
||||
return f.then([this] {
|
||||
// 3 months.
|
||||
static const auto gc_grace_seconds = 90 * 24 * 60 * 60;
|
||||
|
||||
static const sstring users_table_query = sprint(
|
||||
"CREATE TABLE %s.%s (%s text, %s boolean, PRIMARY KEY (%s)) WITH gc_grace_seconds=%s",
|
||||
meta::AUTH_KS,
|
||||
meta::USERS_CF,
|
||||
meta::user_name_col_name,
|
||||
meta::superuser_col_name,
|
||||
meta::user_name_col_name,
|
||||
gc_grace_seconds);
|
||||
|
||||
return create_metadata_table_if_missing(
|
||||
meta::USERS_CF,
|
||||
_qp,
|
||||
users_table_query,
|
||||
_migration_manager);
|
||||
}).then([this] {
|
||||
delay_until_system_ready(_delayed, [this] {
|
||||
return has_existing_users().then([this](bool existing) {
|
||||
if (!existing) {
|
||||
//
|
||||
// Create default superuser.
|
||||
//
|
||||
|
||||
static const sstring query = sprint(
|
||||
"INSERT INTO %s.%s (%s, %s) VALUES (?, ?) USING TIMESTAMP 0",
|
||||
meta::AUTH_KS,
|
||||
meta::USERS_CF,
|
||||
meta::user_name_col_name,
|
||||
meta::superuser_col_name);
|
||||
|
||||
return _qp.process(
|
||||
query,
|
||||
db::consistency_level::ONE,
|
||||
{ meta::DEFAULT_SUPERUSER_NAME, true }).then([](auto&&) {
|
||||
log.info("Created default superuser '{}'", meta::DEFAULT_SUPERUSER_NAME);
|
||||
}).handle_exception([](auto exn) {
|
||||
try {
|
||||
std::rethrow_exception(exn);
|
||||
} catch (const exceptions::request_execution_exception&) {
|
||||
log.warn("Skipped default superuser setup: some nodes were not ready");
|
||||
}
|
||||
}).discard_result();
|
||||
}
|
||||
|
||||
return make_ready_future<>();
|
||||
});
|
||||
});
|
||||
|
||||
return make_ready_future<>();
|
||||
});
|
||||
}
|
||||
|
||||
future<> service::start() {
|
||||
return once_among_shards([this] {
|
||||
if (should_create_metadata()) {
|
||||
return create_metadata_if_missing();
|
||||
}
|
||||
|
||||
return make_ready_future<>();
|
||||
}).then([this] {
|
||||
return when_all_succeed(_authorizer->start(), _authenticator->start());
|
||||
}).then([this] {
|
||||
return once_among_shards([this] {
|
||||
_migration_manager.register_listener(_migration_listener.get());
|
||||
return sharded_permissions_cache.start(std::ref(_cache_config), std::ref(*this), std::ref(log));
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
future<> service::stop() {
|
||||
return once_among_shards([this] {
|
||||
_delayed.cancel_all();
|
||||
return sharded_permissions_cache.stop();
|
||||
}).then([this] {
|
||||
return when_all_succeed(_authorizer->stop(), _authenticator->stop());
|
||||
});
|
||||
}
|
||||
|
||||
future<bool> service::has_existing_users() const {
|
||||
static const sstring default_user_query = sprint(
|
||||
"SELECT * FROM %s.%s WHERE %s = ?",
|
||||
meta::AUTH_KS,
|
||||
meta::USERS_CF,
|
||||
meta::user_name_col_name);
|
||||
|
||||
static const sstring all_users_query = sprint(
|
||||
"SELECT * FROM %s.%s LIMIT 1",
|
||||
meta::AUTH_KS,
|
||||
meta::USERS_CF);
|
||||
|
||||
// This logic is borrowed directly from Apache Cassandra. By first checking for the presence of the default user, we
|
||||
// can potentially avoid doing a range query with a high consistency level.
|
||||
|
||||
return _qp.process(
|
||||
default_user_query,
|
||||
db::consistency_level::ONE,
|
||||
{ meta::DEFAULT_SUPERUSER_NAME },
|
||||
true).then([this](auto results) {
|
||||
if (!results->empty()) {
|
||||
return make_ready_future<bool>(true);
|
||||
}
|
||||
|
||||
return _qp.process(
|
||||
default_user_query,
|
||||
db::consistency_level::QUORUM,
|
||||
{ meta::DEFAULT_SUPERUSER_NAME },
|
||||
true).then([this](auto results) {
|
||||
if (!results->empty()) {
|
||||
return make_ready_future<bool>(true);
|
||||
}
|
||||
|
||||
return _qp.process(
|
||||
all_users_query,
|
||||
db::consistency_level::QUORUM).then([](auto results) {
|
||||
return make_ready_future<bool>(!results->empty());
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
future<bool> service::is_existing_user(const sstring& name) const {
|
||||
return select_user(_qp, name).then([](auto results) {
|
||||
return !results->empty();
|
||||
});
|
||||
}
|
||||
|
||||
future<bool> service::is_super_user(const sstring& name) const {
|
||||
return select_user(_qp, name).then([](auto results) {
|
||||
return !results->empty() && results->one().template get_as<bool>(meta::superuser_col_name);
|
||||
});
|
||||
}
|
||||
|
||||
future<> service::insert_user(const sstring& name, bool is_superuser) {
|
||||
return _qp.process(
|
||||
sprint(
|
||||
"INSERT INTO %s.%s (%s, %s) VALUES (?, ?)",
|
||||
meta::AUTH_KS,
|
||||
meta::USERS_CF,
|
||||
meta::user_name_col_name,
|
||||
meta::superuser_col_name),
|
||||
consistency_for_user(name),
|
||||
{ name, is_superuser }).discard_result();
|
||||
}
|
||||
|
||||
future<> service::delete_user(const sstring& name) {
|
||||
return _qp.process(
|
||||
sprint(
|
||||
"DELETE FROM %s.%s WHERE %s = ?",
|
||||
meta::AUTH_KS,
|
||||
meta::USERS_CF,
|
||||
meta::user_name_col_name),
|
||||
consistency_for_user(name),
|
||||
{ name }).discard_result();
|
||||
}
|
||||
|
||||
future<permission_set> service::get_permissions(::shared_ptr<authenticated_user> u, data_resource r) const {
|
||||
return sharded_permissions_cache.local().get(std::move(u), std::move(r));
|
||||
}
|
||||
|
||||
//
|
||||
// Free functions.
|
||||
//
|
||||
|
||||
future<bool> is_super_user(const service& ser, const authenticated_user& u) {
|
||||
if (u.is_anonymous()) {
|
||||
return make_ready_future<bool>(false);
|
||||
}
|
||||
|
||||
return ser.is_super_user(u.name());
|
||||
}
|
||||
|
||||
}
|
||||
132
auth/service.hh
Normal file
132
auth/service.hh
Normal file
@@ -0,0 +1,132 @@
|
||||
/*
|
||||
* Copyright (C) 2017 ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* This file is part of Scylla.
|
||||
*
|
||||
* Scylla is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Scylla is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <memory>
|
||||
|
||||
#include <seastar/core/future.hh>
|
||||
#include <seastar/core/sstring.hh>
|
||||
|
||||
#include "auth/authenticator.hh"
|
||||
#include "auth/authorizer.hh"
|
||||
#include "auth/authenticated_user.hh"
|
||||
#include "auth/permission.hh"
|
||||
#include "auth/permissions_cache.hh"
|
||||
#include "delayed_tasks.hh"
|
||||
#include "seastarx.hh"
|
||||
|
||||
namespace cql3 {
|
||||
class query_processor;
|
||||
}
|
||||
|
||||
namespace db {
|
||||
class config;
|
||||
}
|
||||
|
||||
namespace service {
|
||||
class migration_manager;
|
||||
class migration_listener;
|
||||
}
|
||||
|
||||
namespace auth {
|
||||
|
||||
class authenticator;
|
||||
class authorizer;
|
||||
|
||||
struct service_config final {
|
||||
static service_config from_db_config(const db::config&);
|
||||
|
||||
sstring authorizer_java_name;
|
||||
sstring authenticator_java_name;
|
||||
};
|
||||
|
||||
class service final {
|
||||
permissions_cache_config _cache_config;
|
||||
|
||||
cql3::query_processor& _qp;
|
||||
|
||||
::service::migration_manager& _migration_manager;
|
||||
|
||||
std::unique_ptr<authorizer> _authorizer;
|
||||
|
||||
std::unique_ptr<authenticator> _authenticator;
|
||||
|
||||
// Only one of these should be registered, so we end up with some unused instances. Not the end of the world.
|
||||
std::unique_ptr<::service::migration_listener> _migration_listener;
|
||||
|
||||
delayed_tasks<> _delayed{};
|
||||
|
||||
public:
|
||||
service(
|
||||
permissions_cache_config,
|
||||
cql3::query_processor&,
|
||||
::service::migration_manager&,
|
||||
std::unique_ptr<authorizer>,
|
||||
std::unique_ptr<authenticator>);
|
||||
|
||||
service(
|
||||
permissions_cache_config,
|
||||
cql3::query_processor&,
|
||||
::service::migration_manager&,
|
||||
const service_config&);
|
||||
|
||||
future<> start();
|
||||
|
||||
future<> stop();
|
||||
|
||||
future<bool> is_existing_user(const sstring& name) const;
|
||||
|
||||
future<bool> is_super_user(const sstring& name) const;
|
||||
|
||||
future<> insert_user(const sstring& name, bool is_superuser);
|
||||
|
||||
future<> delete_user(const sstring& name);
|
||||
|
||||
future<permission_set> get_permissions(::shared_ptr<authenticated_user>, data_resource) const;
|
||||
|
||||
authenticator& underlying_authenticator() {
|
||||
return *_authenticator;
|
||||
}
|
||||
|
||||
const authenticator& underlying_authenticator() const {
|
||||
return *_authenticator;
|
||||
}
|
||||
|
||||
authorizer& underlying_authorizer() {
|
||||
return *_authorizer;
|
||||
}
|
||||
|
||||
const authorizer& underlying_authorizer() const {
|
||||
return *_authorizer;
|
||||
}
|
||||
|
||||
private:
|
||||
future<bool> has_existing_users() const;
|
||||
|
||||
bool should_create_metadata() const;
|
||||
|
||||
future<> create_metadata_if_missing();
|
||||
};
|
||||
|
||||
future<bool> is_super_user(const service&, const authenticated_user&);
|
||||
|
||||
}
|
||||
@@ -46,12 +46,13 @@
|
||||
#include "password_authenticator.hh"
|
||||
#include "default_authorizer.hh"
|
||||
#include "permission.hh"
|
||||
#include "auth.hh"
|
||||
#include "db/config.hh"
|
||||
#include "utils/class_registrator.hh"
|
||||
|
||||
namespace auth {
|
||||
|
||||
class service;
|
||||
|
||||
static const sstring PACKAGE_NAME("com.scylladb.auth.");
|
||||
|
||||
static const sstring& transitional_authenticator_name() {
|
||||
@@ -69,8 +70,8 @@ class transitional_authenticator : public authenticator {
|
||||
public:
|
||||
static const sstring PASSWORD_AUTHENTICATOR_NAME;
|
||||
|
||||
transitional_authenticator(cql3::query_processor& qp)
|
||||
: transitional_authenticator(std::make_unique<password_authenticator>(qp))
|
||||
transitional_authenticator(cql3::query_processor& qp, ::service::migration_manager& mm)
|
||||
: transitional_authenticator(std::make_unique<password_authenticator>(qp, mm))
|
||||
{}
|
||||
transitional_authenticator(std::unique_ptr<authenticator> a)
|
||||
: _authenticator(std::move(a))
|
||||
@@ -153,8 +154,8 @@ public:
|
||||
class transitional_authorizer : public authorizer {
|
||||
std::unique_ptr<authorizer> _authorizer;
|
||||
public:
|
||||
transitional_authorizer(cql3::query_processor& qp)
|
||||
: transitional_authorizer(std::make_unique<default_authorizer>(qp))
|
||||
transitional_authorizer(cql3::query_processor& qp, ::service::migration_manager& mm)
|
||||
: transitional_authorizer(std::make_unique<default_authorizer>(qp, mm))
|
||||
{}
|
||||
transitional_authorizer(std::unique_ptr<authorizer> a)
|
||||
: _authorizer(std::move(a))
|
||||
@@ -170,8 +171,8 @@ public:
|
||||
const sstring& qualified_java_name() const override {
|
||||
return transitional_authorizer_name();
|
||||
}
|
||||
future<permission_set> authorize(::shared_ptr<authenticated_user> user, data_resource resource) const override {
|
||||
return user->is_super().then([](bool s) {
|
||||
future<permission_set> authorize(service& ser, ::shared_ptr<authenticated_user> user, data_resource resource) const override {
|
||||
return is_super_user(ser, *user).then([](bool s) {
|
||||
static const permission_set transitional_permissions =
|
||||
permission_set::of<permission::CREATE,
|
||||
permission::ALTER, permission::DROP,
|
||||
@@ -186,8 +187,8 @@ public:
|
||||
future<> revoke(::shared_ptr<authenticated_user> user, permission_set ps, data_resource r, sstring s) override {
|
||||
return _authorizer->revoke(std::move(user), std::move(ps), std::move(r), std::move(s));
|
||||
}
|
||||
future<std::vector<permission_details>> list(::shared_ptr<authenticated_user> user, permission_set ps, optional<data_resource> r, optional<sstring> s) const override {
|
||||
return _authorizer->list(std::move(user), std::move(ps), std::move(r), std::move(s));
|
||||
future<std::vector<permission_details>> list(service& ser, ::shared_ptr<authenticated_user> user, permission_set ps, optional<data_resource> r, optional<sstring> s) const override {
|
||||
return _authorizer->list(ser, std::move(user), std::move(ps), std::move(r), std::move(s));
|
||||
}
|
||||
future<> revoke_all(sstring s) override {
|
||||
return _authorizer->revoke_all(std::move(s));
|
||||
@@ -209,9 +210,14 @@ public:
|
||||
// To ensure correct initialization order, we unfortunately need to use string literals.
|
||||
//
|
||||
|
||||
static const class_registrator<auth::authenticator,
|
||||
auth::transitional_authenticator, cql3::query_processor&> transitional_authenticator_reg(
|
||||
"com.scylladb.auth.TransitionalAuthenticator");
|
||||
static const class_registrator<
|
||||
auth::authenticator,
|
||||
auth::transitional_authenticator,
|
||||
cql3::query_processor&,
|
||||
::service::migration_manager&> transitional_authenticator_reg("com.scylladb.auth.TransitionalAuthenticator");
|
||||
|
||||
static const class_registrator<auth::authorizer, auth::transitional_authorizer, cql3::query_processor&> transitional_authorizer_reg(
|
||||
"com.scylladb.auth.TransitionalAuthorizer");
|
||||
static const class_registrator<
|
||||
auth::authorizer,
|
||||
auth::transitional_authorizer,
|
||||
cql3::query_processor&,
|
||||
::service::migration_manager&> transitional_authorizer_reg("com.scylladb.auth.TransitionalAuthorizer");
|
||||
|
||||
@@ -524,16 +524,17 @@ scylla_core = (['database.cc',
|
||||
'lister.cc',
|
||||
'repair/repair.cc',
|
||||
'exceptions/exceptions.cc',
|
||||
'auth/auth.cc',
|
||||
'auth/allow_all_authenticator.cc',
|
||||
'auth/allow_all_authorizer.cc',
|
||||
'auth/authenticated_user.cc',
|
||||
'auth/authenticator.cc',
|
||||
'auth/authorizer.cc',
|
||||
'auth/common.cc',
|
||||
'auth/default_authorizer.cc',
|
||||
'auth/data_resource.cc',
|
||||
'auth/password_authenticator.cc',
|
||||
'auth/permission.cc',
|
||||
'auth/permissions_cache.cc',
|
||||
'auth/service.cc',
|
||||
'auth/transitional.cc',
|
||||
'tracing/tracing.cc',
|
||||
'tracing/trace_keyspace_helper.cc',
|
||||
|
||||
@@ -42,8 +42,8 @@
|
||||
#include <boost/range/adaptor/map.hpp>
|
||||
|
||||
#include "alter_user_statement.hh"
|
||||
#include "auth/auth.hh"
|
||||
#include "auth/authenticator.hh"
|
||||
#include "auth/service.hh"
|
||||
|
||||
cql3::statements::alter_user_statement::alter_user_statement(sstring username, ::shared_ptr<user_options> opts, std::experimental::optional<bool> superuser)
|
||||
: _username(std::move(username))
|
||||
@@ -52,7 +52,7 @@ cql3::statements::alter_user_statement::alter_user_statement(sstring username, :
|
||||
{}
|
||||
|
||||
void cql3::statements::alter_user_statement::validate(distributed<service::storage_proxy>& proxy, const service::client_state& state) {
|
||||
_opts->validate();
|
||||
_opts->validate(state.get_auth_service()->underlying_authenticator());
|
||||
|
||||
if (!_superuser && _opts->empty()) {
|
||||
throw exceptions::invalid_request_exception("ALTER USER can't be empty");
|
||||
@@ -73,7 +73,10 @@ future<> cql3::statements::alter_user_statement::check_access(const service::cli
|
||||
// my disgust.
|
||||
throw exceptions::unauthorized_exception("You aren't allowed to alter your own superuser status");
|
||||
}
|
||||
return user->is_super().then([this, user](bool is_super) {
|
||||
|
||||
const auto& auth_service = *state.get_auth_service();
|
||||
|
||||
return auth::is_super_user(auth_service, *user).then([this, user, &auth_service](bool is_super) {
|
||||
if (_superuser && !is_super) {
|
||||
throw exceptions::unauthorized_exception("Only superusers are allowed to alter superuser status");
|
||||
}
|
||||
@@ -84,7 +87,7 @@ future<> cql3::statements::alter_user_statement::check_access(const service::cli
|
||||
|
||||
if (!is_super) {
|
||||
for (auto o : _opts->options() | boost::adaptors::map_keys) {
|
||||
if (!auth::authenticator::get().alterable_options().contains(o)) {
|
||||
if (!auth_service.underlying_authenticator().alterable_options().contains(o)) {
|
||||
throw exceptions::unauthorized_exception(sprint("You aren't allowed to alter {} option", o));
|
||||
}
|
||||
}
|
||||
@@ -94,14 +97,17 @@ future<> cql3::statements::alter_user_statement::check_access(const service::cli
|
||||
|
||||
future<::shared_ptr<cql_transport::messages::result_message>>
|
||||
cql3::statements::alter_user_statement::execute(distributed<service::storage_proxy>& proxy, service::query_state& state, const query_options& options) {
|
||||
return auth::auth::is_existing_user(_username).then([this](bool exists) {
|
||||
auto& client_state = state.get_client_state();
|
||||
auto& auth_service = *client_state.get_auth_service();
|
||||
|
||||
return auth_service.is_existing_user(_username).then([this, &auth_service](bool exists) {
|
||||
if (!exists) {
|
||||
throw exceptions::invalid_request_exception(sprint("User %s doesn't exist", _username));
|
||||
}
|
||||
auto f = _opts->options().empty() ? make_ready_future() : auth::authenticator::get().alter(_username, _opts->options());
|
||||
auto f = _opts->options().empty() ? make_ready_future() : auth_service.underlying_authenticator().alter(_username, _opts->options());
|
||||
if (_superuser) {
|
||||
f = f.then([this] {
|
||||
return auth::auth::insert_user(_username, *_superuser);
|
||||
f = f.then([this, &auth_service] {
|
||||
return auth_service.insert_user(_username, *_superuser);
|
||||
});
|
||||
}
|
||||
return f.then([] { return make_ready_future<::shared_ptr<cql_transport::messages::result_message>>(); });
|
||||
|
||||
@@ -40,8 +40,8 @@
|
||||
*/
|
||||
|
||||
#include "create_user_statement.hh"
|
||||
#include "auth/auth.hh"
|
||||
#include "auth/authenticator.hh"
|
||||
#include "auth/service.hh"
|
||||
|
||||
cql3::statements::create_user_statement::create_user_statement(sstring username, ::shared_ptr<user_options> opts, bool superuser, bool if_not_exists)
|
||||
: _username(std::move(username))
|
||||
@@ -55,7 +55,7 @@ void cql3::statements::create_user_statement::validate(distributed<service::stor
|
||||
throw exceptions::invalid_request_exception("Username can't be an empty string");
|
||||
}
|
||||
|
||||
_opts->validate();
|
||||
_opts->validate(state.get_auth_service()->underlying_authenticator());
|
||||
|
||||
// validate login here before checkAccess to avoid leaking user existence to anonymous users.
|
||||
state.ensure_not_anonymous();
|
||||
@@ -66,19 +66,22 @@ void cql3::statements::create_user_statement::validate(distributed<service::stor
|
||||
|
||||
future<::shared_ptr<cql_transport::messages::result_message>>
|
||||
cql3::statements::create_user_statement::execute(distributed<service::storage_proxy>& proxy, service::query_state& state, const query_options& options) {
|
||||
return state.get_client_state().user()->is_super().then([this](bool is_super) {
|
||||
auto& client_state = state.get_client_state();
|
||||
auto& auth_service = *client_state.get_auth_service();
|
||||
|
||||
return auth::is_super_user(auth_service, *client_state.user()).then([this, &auth_service](bool is_super) {
|
||||
if (!is_super) {
|
||||
throw exceptions::unauthorized_exception("Only superusers are allowed to perform CREATE USER queries");
|
||||
}
|
||||
return auth::auth::is_existing_user(_username).then([this](bool exists) {
|
||||
return auth_service.is_existing_user(_username).then([this, &auth_service](bool exists) {
|
||||
if (exists && !_if_not_exists) {
|
||||
throw exceptions::invalid_request_exception(sprint("User %s already exists", _username));
|
||||
}
|
||||
if (exists && _if_not_exists) {
|
||||
make_ready_future<::shared_ptr<cql_transport::messages::result_message>>();
|
||||
}
|
||||
return auth::authenticator::get().create(_username, _opts->options()).then([this] {
|
||||
return auth::auth::insert_user(_username, _superuser).then([] {
|
||||
return auth_service.underlying_authenticator().create(_username, _opts->options()).then([this, &auth_service] {
|
||||
return auth_service.insert_user(_username, _superuser).then([] {
|
||||
return make_ready_future<::shared_ptr<cql_transport::messages::result_message>>();
|
||||
});
|
||||
});
|
||||
|
||||
@@ -42,9 +42,9 @@
|
||||
#include <boost/range/adaptor/map.hpp>
|
||||
|
||||
#include "drop_user_statement.hh"
|
||||
#include "auth/auth.hh"
|
||||
#include "auth/authenticator.hh"
|
||||
#include "auth/authorizer.hh"
|
||||
#include "auth/service.hh"
|
||||
|
||||
cql3::statements::drop_user_statement::drop_user_statement(sstring username, bool if_exists)
|
||||
: _username(std::move(username))
|
||||
@@ -65,12 +65,15 @@ void cql3::statements::drop_user_statement::validate(distributed<service::storag
|
||||
|
||||
future<::shared_ptr<cql_transport::messages::result_message>>
|
||||
cql3::statements::drop_user_statement::execute(distributed<service::storage_proxy>& proxy, service::query_state& state, const query_options& options) {
|
||||
return state.get_client_state().user()->is_super().then([this](bool is_super) {
|
||||
auto& client_state = state.get_client_state();
|
||||
auto& auth_service = *client_state.get_auth_service();
|
||||
|
||||
return auth::is_super_user(auth_service, *client_state.user()).then([this, &auth_service](bool is_super) {
|
||||
if (!is_super) {
|
||||
throw exceptions::unauthorized_exception("Only superusers are allowed to perform DROP USER queries");
|
||||
}
|
||||
|
||||
return auth::auth::is_existing_user(_username).then([this](bool exists) {
|
||||
return auth_service.is_existing_user(_username).then([this, &auth_service](bool exists) {
|
||||
if (!_if_exists && !exists) {
|
||||
throw exceptions::invalid_request_exception(sprint("User %s doesn't exist", _username));
|
||||
}
|
||||
@@ -79,9 +82,9 @@ cql3::statements::drop_user_statement::execute(distributed<service::storage_prox
|
||||
}
|
||||
|
||||
// clean up permissions after the dropped user.
|
||||
return auth::authorizer::get().revoke_all(_username).then([this] {
|
||||
return auth::auth::delete_user(_username).then([this] {
|
||||
return auth::authenticator::get().drop(_username);
|
||||
return auth_service.underlying_authorizer().revoke_all(_username).then([this, &auth_service] {
|
||||
return auth_service.delete_user(_username).then([this, &auth_service] {
|
||||
return auth_service.underlying_authenticator().drop(_username);
|
||||
});
|
||||
}).then([] {
|
||||
return make_ready_future<::shared_ptr<cql_transport::messages::result_message>>();
|
||||
|
||||
@@ -44,7 +44,10 @@
|
||||
|
||||
future<::shared_ptr<cql_transport::messages::result_message>>
|
||||
cql3::statements::grant_statement::execute(distributed<service::storage_proxy>& proxy, service::query_state& state, const query_options& options) {
|
||||
return auth::authorizer::get().grant(state.get_client_state().user(), _permissions, _resource, _username).then([] {
|
||||
auto& client_state = state.get_client_state();
|
||||
auto& auth_service = *client_state.get_auth_service();
|
||||
|
||||
return auth_service.underlying_authorizer().grant(client_state.user(), _permissions, _resource, _username).then([] {
|
||||
return make_ready_future<::shared_ptr<cql_transport::messages::result_message>>();
|
||||
});
|
||||
}
|
||||
|
||||
@@ -44,7 +44,6 @@
|
||||
|
||||
#include "list_permissions_statement.hh"
|
||||
#include "auth/authorizer.hh"
|
||||
#include "auth/auth.hh"
|
||||
#include "auth/common.hh"
|
||||
#include "cql3/result_set.hh"
|
||||
#include "transport/messages/result_message.hh"
|
||||
@@ -65,7 +64,7 @@ void cql3::statements::list_permissions_statement::validate(distributed<service:
|
||||
future<> cql3::statements::list_permissions_statement::check_access(const service::client_state& state) {
|
||||
auto f = make_ready_future();
|
||||
if (_username) {
|
||||
f = auth::auth::is_existing_user(*_username).then([this](bool exists) {
|
||||
f = state.get_auth_service()->is_existing_user(*_username).then([this](bool exists) {
|
||||
if (!exists) {
|
||||
throw exceptions::invalid_request_exception(sprint("User %s doesn't exist", *_username));
|
||||
}
|
||||
@@ -105,7 +104,8 @@ cql3::statements::list_permissions_statement::execute(distributed<service::stora
|
||||
}
|
||||
|
||||
return map_reduce(resources, [&state, this](opt_resource r) {
|
||||
return auth::authorizer::get().list(state.get_client_state().user(), _permissions, std::move(r), _username);
|
||||
auto& auth_service = *state.get_client_state().get_auth_service();
|
||||
return auth_service.underlying_authorizer().list(auth_service, state.get_client_state().user(), _permissions, std::move(r), _username);
|
||||
}, std::vector<auth::permission_details>(), [](std::vector<auth::permission_details> details, std::vector<auth::permission_details> pd) {
|
||||
details.insert(details.end(), pd.begin(), pd.end());
|
||||
return std::move(details);
|
||||
|
||||
@@ -41,11 +41,11 @@
|
||||
|
||||
#include <seastar/core/thread.hh>
|
||||
|
||||
#include "auth/service.hh"
|
||||
#include "permission_altering_statement.hh"
|
||||
#include "cql3/query_processor.hh"
|
||||
#include "cql3/query_options.hh"
|
||||
#include "cql3/selection/selection.hh"
|
||||
#include "auth/auth.hh"
|
||||
|
||||
cql3::statements::permission_altering_statement::permission_altering_statement(
|
||||
auth::permission_set permissions, auth::data_resource resource,
|
||||
@@ -60,7 +60,7 @@ void cql3::statements::permission_altering_statement::validate(distributed<servi
|
||||
}
|
||||
|
||||
future<> cql3::statements::permission_altering_statement::check_access(const service::client_state& state) {
|
||||
return auth::auth::is_existing_user(_username).then([this, &state](bool exists) {
|
||||
return state.get_auth_service()->is_existing_user(_username).then([this, &state](bool exists) {
|
||||
if (!exists) {
|
||||
throw exceptions::invalid_request_exception(sprint("User %s doesn't exist", _username));
|
||||
}
|
||||
|
||||
@@ -44,7 +44,10 @@
|
||||
|
||||
future<::shared_ptr<cql_transport::messages::result_message>>
|
||||
cql3::statements::revoke_statement::execute(distributed<service::storage_proxy>& proxy, service::query_state& state, const query_options& options) {
|
||||
return auth::authorizer::get().revoke(state.get_client_state().user(), _permissions, _resource, _username).then([] {
|
||||
auto& client_state = state.get_client_state();
|
||||
auto& auth_service = *client_state.get_auth_service();
|
||||
|
||||
return auth_service.underlying_authorizer().revoke(client_state.user(), _permissions, _resource, _username).then([] {
|
||||
return make_ready_future<::shared_ptr<cql_transport::messages::result_message>>();
|
||||
});
|
||||
}
|
||||
|
||||
@@ -49,8 +49,7 @@ void cql3::user_options::put(const sstring& name, const sstring& value) {
|
||||
_options[auth::authenticator::string_to_option(name)] = value;
|
||||
}
|
||||
|
||||
void cql3::user_options::validate() const {
|
||||
auto& a = auth::authenticator::get();
|
||||
void cql3::user_options::validate(const auth::authenticator& a) const {
|
||||
for (auto o : _options | boost::adaptors::map_keys) {
|
||||
if (!a.supported_options().contains(o)) {
|
||||
throw exceptions::invalid_request_exception(
|
||||
|
||||
@@ -42,6 +42,10 @@
|
||||
|
||||
#include "auth/authenticator.hh"
|
||||
|
||||
namespace auth {
|
||||
class authenticator;
|
||||
}
|
||||
|
||||
namespace cql3 {
|
||||
|
||||
class user_options {
|
||||
@@ -56,7 +60,7 @@ public:
|
||||
const auth::authenticator::option_map& options() const {
|
||||
return _options;
|
||||
}
|
||||
void validate() const;
|
||||
void validate(const auth::authenticator&) const;
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
4
init.cc
4
init.cc
@@ -34,8 +34,8 @@ logging::logger startlog("init");
|
||||
// duplicated in cql_test_env.cc
|
||||
// until proper shutdown is done.
|
||||
|
||||
void init_storage_service(distributed<database>& db) {
|
||||
service::init_storage_service(db).get();
|
||||
void init_storage_service(distributed<database>& db, sharded<auth::service>& auth_service) {
|
||||
service::init_storage_service(db, auth_service).get();
|
||||
// #293 - do not stop anything
|
||||
//engine().at_exit([] { return service::deinit_storage_service(); });
|
||||
}
|
||||
|
||||
3
init.hh
3
init.hh
@@ -23,6 +23,7 @@
|
||||
#include <seastar/core/sstring.hh>
|
||||
#include <seastar/core/future.hh>
|
||||
#include <seastar/core/distributed.hh>
|
||||
#include "auth/service.hh"
|
||||
#include "db/config.hh"
|
||||
#include "database.hh"
|
||||
#include "log.hh"
|
||||
@@ -31,7 +32,7 @@ extern logging::logger startlog;
|
||||
|
||||
class bad_configuration_error : public std::exception {};
|
||||
|
||||
void init_storage_service(distributed<database>& db);
|
||||
void init_storage_service(distributed<database>& db, sharded<auth::service>&);
|
||||
void init_ms_fd_gossiper(sstring listen_address
|
||||
, uint16_t storage_port
|
||||
, uint16_t ssl_storage_port
|
||||
|
||||
3
main.cc
3
main.cc
@@ -436,8 +436,9 @@ int main(int ac, char** av) {
|
||||
api::set_server_init(ctx).get();
|
||||
ctx.http_server.listen(ipv4_addr{ip, api_port}).get();
|
||||
startlog.info("Scylla API server listening on {}:{} ...", api_address, api_port);
|
||||
static sharded<auth::service> auth_service;
|
||||
supervisor::notify("initializing storage service");
|
||||
init_storage_service(db);
|
||||
init_storage_service(db, auth_service);
|
||||
supervisor::notify("starting per-shard database core");
|
||||
// Note: changed from using a move here, because we want the config object intact.
|
||||
db.start(std::ref(*cfg)).get();
|
||||
|
||||
@@ -40,7 +40,6 @@
|
||||
*/
|
||||
|
||||
#include "client_state.hh"
|
||||
#include "auth/auth.hh"
|
||||
#include "auth/authorizer.hh"
|
||||
#include "auth/authenticator.hh"
|
||||
#include "auth/common.hh"
|
||||
@@ -62,7 +61,7 @@ future<> service::client_state::check_user_exists() {
|
||||
return make_ready_future();
|
||||
}
|
||||
|
||||
return auth::auth::is_existing_user(_user->name()).then([user = _user](bool exists) mutable {
|
||||
return _auth_service->is_existing_user(_user->name()).then([user = _user](bool exists) mutable {
|
||||
if (!exists) {
|
||||
throw exceptions::authentication_exception(
|
||||
sprint("User %s doesn't exist - create it with CREATE USER query first",
|
||||
@@ -161,8 +160,8 @@ future<> service::client_state::has_access(const sstring& ks, auth::permission p
|
||||
return make_ready_future();
|
||||
}
|
||||
if (auth::permissions::ALTERATIONS.contains(p)) {
|
||||
for (auto& s : { auth::authorizer::get().protected_resources(),
|
||||
auth::authenticator::get().protected_resources() }) {
|
||||
for (auto& s : { _auth_service->underlying_authorizer().protected_resources(),
|
||||
_auth_service->underlying_authorizer().protected_resources() }) {
|
||||
if (s.count(resource)) {
|
||||
throw exceptions::unauthorized_exception(
|
||||
sprint("%s schema is protected",
|
||||
@@ -175,12 +174,16 @@ future<> service::client_state::has_access(const sstring& ks, auth::permission p
|
||||
}
|
||||
|
||||
future<bool> service::client_state::check_has_permission(auth::permission p, auth::data_resource resource) const {
|
||||
if (_is_internal) {
|
||||
return make_ready_future<bool>(true);
|
||||
}
|
||||
|
||||
std::experimental::optional<auth::data_resource> parent;
|
||||
if (resource.has_parent()) {
|
||||
parent = resource.get_parent();
|
||||
}
|
||||
|
||||
return auth::auth::get_permissions(_user, resource).then([this, p, parent = std::move(parent)](auth::permission_set set) {
|
||||
return _auth_service->get_permissions(_user, resource).then([this, p, parent = std::move(parent)](auth::permission_set set) {
|
||||
if (set.contains(p)) {
|
||||
return make_ready_future<bool>(true);
|
||||
}
|
||||
|
||||
@@ -41,6 +41,7 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "auth/service.hh"
|
||||
#include "exceptions/exceptions.hh"
|
||||
#include "unimplemented.hh"
|
||||
#include "timestamp.hh"
|
||||
@@ -99,6 +100,8 @@ private:
|
||||
// Address of a client
|
||||
socket_address _remote_address;
|
||||
|
||||
// Only populated for external client state.
|
||||
auth::service* _auth_service{nullptr};
|
||||
public:
|
||||
struct internal_tag {};
|
||||
struct external_tag {};
|
||||
@@ -122,11 +125,12 @@ public:
|
||||
return _trace_state_ptr;
|
||||
}
|
||||
|
||||
client_state(external_tag, const socket_address& remote_address = socket_address(), bool thrift = false)
|
||||
client_state(external_tag, auth::service& auth_service, const socket_address& remote_address = socket_address(), bool thrift = false)
|
||||
: _is_internal(false)
|
||||
, _is_thrift(thrift)
|
||||
, _remote_address(remote_address) {
|
||||
if (!auth::authenticator::get().require_authentication()) {
|
||||
, _remote_address(remote_address)
|
||||
, _auth_service(&auth_service) {
|
||||
if (!auth_service.underlying_authenticator().require_authentication()) {
|
||||
_user = ::make_shared<auth::authenticated_user>();
|
||||
}
|
||||
}
|
||||
@@ -137,6 +141,16 @@ public:
|
||||
|
||||
client_state(internal_tag) : _keyspace("system"), _is_internal(true), _is_thrift(false) {}
|
||||
|
||||
// `nullptr` for internal instances.
|
||||
auth::service* get_auth_service() {
|
||||
return _auth_service;
|
||||
}
|
||||
|
||||
// See above.
|
||||
const auth::service* get_auth_service() const {
|
||||
return _auth_service;
|
||||
}
|
||||
|
||||
void merge(const client_state& other);
|
||||
|
||||
bool is_thrift() const {
|
||||
@@ -155,13 +169,15 @@ public:
|
||||
}
|
||||
|
||||
/**
|
||||
* The `auth::service` should be non-`nullptr` for native protocol users.
|
||||
*
|
||||
* @return a ClientState object for external clients (thrift/native protocol users).
|
||||
*/
|
||||
static client_state for_external_calls() {
|
||||
return client_state(external_tag());
|
||||
static client_state for_external_calls(auth::service& ser) {
|
||||
return client_state(external_tag(), ser);
|
||||
}
|
||||
static client_state for_external_thrift_calls() {
|
||||
return client_state(external_tag(), socket_address(), true);
|
||||
static client_state for_external_thrift_calls(auth::service& ser) {
|
||||
return client_state(external_tag(), ser, socket_address(), true);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -288,3 +304,4 @@ public:
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
|
||||
@@ -65,7 +65,6 @@
|
||||
#include <seastar/core/rwlock.hh>
|
||||
#include "db/batchlog_manager.hh"
|
||||
#include "db/commitlog/commitlog.hh"
|
||||
#include "auth/auth.hh"
|
||||
#include <seastar/net/tls.hh>
|
||||
#include <seastar/net/dns.hh>
|
||||
#include "utils/exceptions.hh"
|
||||
@@ -100,8 +99,9 @@ int get_generation_number() {
|
||||
return generation_number;
|
||||
}
|
||||
|
||||
storage_service::storage_service(distributed<database>& db)
|
||||
storage_service::storage_service(distributed<database>& db, sharded<auth::service>& auth_service)
|
||||
: _db(db)
|
||||
, _auth_service(auth_service)
|
||||
, _replicate_action([this] { return do_replicate_to_all_cores(); }) {
|
||||
sstable_read_error.connect([this] { isolate_on_error(); });
|
||||
sstable_write_error.connect([this] { isolate_on_error(); });
|
||||
@@ -521,7 +521,15 @@ void storage_service::join_token_ring(int delay) {
|
||||
slogger.error("{}", err);
|
||||
throw std::runtime_error(err);
|
||||
}
|
||||
auth::auth::setup().get();
|
||||
|
||||
_auth_service.start(
|
||||
auth::permissions_cache_config::from_db_config(_db.local().get_config()),
|
||||
std::ref(cql3::get_query_processor()),
|
||||
std::ref(service::get_migration_manager()),
|
||||
auth::service_config::from_db_config(_db.local().get_config())).get();
|
||||
|
||||
_auth_service.invoke_on_all(&auth::service::start).get();
|
||||
|
||||
supervisor::notify("starting tracing");
|
||||
tracing::tracing::start_tracing().get();
|
||||
} else {
|
||||
@@ -546,7 +554,14 @@ future<> storage_service::join_ring() {
|
||||
slogger.error("{}", err);
|
||||
throw std::runtime_error(err);
|
||||
}
|
||||
auth::auth::setup().get();
|
||||
|
||||
ss._auth_service.start(
|
||||
auth::permissions_cache_config::from_db_config(ss._db.local().get_config()),
|
||||
std::ref(cql3::get_query_processor()),
|
||||
std::ref(service::get_migration_manager()),
|
||||
auth::service_config::from_db_config(ss._db.local().get_config())).get();
|
||||
|
||||
ss._auth_service.invoke_on_all(&auth::service::start).get();
|
||||
}
|
||||
});
|
||||
});
|
||||
@@ -1178,7 +1193,7 @@ future<> storage_service::stop_transport() {
|
||||
ss.do_stop_stream_manager().get();
|
||||
slogger.info("Stop transport: shutdown stream_manager done");
|
||||
|
||||
auth::auth::shutdown().get();
|
||||
ss._auth_service.stop().get();
|
||||
slogger.info("Stop transport: auth shutdown");
|
||||
|
||||
slogger.info("Stop transport: done");
|
||||
@@ -1918,7 +1933,7 @@ future<> storage_service::start_rpc_server() {
|
||||
auto addr = cfg.rpc_address();
|
||||
auto keepalive = cfg.rpc_keepalive();
|
||||
return seastar::net::dns::resolve_name(addr).then([&ss, tserver, addr, port, keepalive] (seastar::net::inet_address ip) {
|
||||
return tserver->start(std::ref(ss._db), std::ref(cql3::get_query_processor())).then([tserver, port, addr, ip, keepalive] {
|
||||
return tserver->start(std::ref(ss._db), std::ref(cql3::get_query_processor()), std::ref(ss._auth_service)).then([tserver, port, addr, ip, keepalive] {
|
||||
// #293 - do not stop anything
|
||||
//engine().at_exit([tserver] {
|
||||
// return tserver->stop();
|
||||
@@ -1969,8 +1984,8 @@ future<> storage_service::start_native_transport() {
|
||||
auto ceo = cfg.client_encryption_options();
|
||||
auto keepalive = cfg.rpc_keepalive();
|
||||
cql_transport::cql_load_balance lb = cql_transport::parse_load_balance(cfg.load_balance());
|
||||
return seastar::net::dns::resolve_name(addr).then([cserver, addr, &cfg, lb, keepalive, ceo = std::move(ceo)] (seastar::net::inet_address ip) {
|
||||
return cserver->start(std::ref(service::get_storage_proxy()), std::ref(cql3::get_query_processor()), lb).then([cserver, &cfg, addr, ip, ceo, keepalive]() {
|
||||
return seastar::net::dns::resolve_name(addr).then([&ss, cserver, addr, &cfg, lb, keepalive, ceo = std::move(ceo)] (seastar::net::inet_address ip) {
|
||||
return cserver->start(std::ref(service::get_storage_proxy()), std::ref(cql3::get_query_processor()), lb, std::ref(ss._auth_service)).then([cserver, &cfg, addr, ip, ceo, keepalive]() {
|
||||
// #293 - do not stop anything
|
||||
//engine().at_exit([cserver] {
|
||||
// return cserver->stop();
|
||||
|
||||
@@ -39,6 +39,7 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "auth/service.hh"
|
||||
#include "gms/i_endpoint_state_change_subscriber.hh"
|
||||
#include "service/endpoint_lifecycle_subscriber.hh"
|
||||
#include "locator/token_metadata.hh"
|
||||
@@ -113,6 +114,7 @@ private:
|
||||
private final AtomicLong notificationSerialNumber = new AtomicLong();
|
||||
#endif
|
||||
distributed<database>& _db;
|
||||
sharded<auth::service>& _auth_service;
|
||||
int _update_jobs{0};
|
||||
// Note that this is obviously only valid for the current shard. Users of
|
||||
// this facility should elect a shard to be the coordinator based on any
|
||||
@@ -129,7 +131,7 @@ private:
|
||||
bool _ms_stopped = false;
|
||||
bool _stream_manager_stopped = false;
|
||||
public:
|
||||
storage_service(distributed<database>& db);
|
||||
storage_service(distributed<database>& db, sharded<auth::service>&);
|
||||
void isolate_on_error();
|
||||
void isolate_on_commit_error();
|
||||
|
||||
@@ -2243,8 +2245,8 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
inline future<> init_storage_service(distributed<database>& db) {
|
||||
return service::get_storage_service().start(std::ref(db));
|
||||
inline future<> init_storage_service(distributed<database>& db, sharded<auth::service>& auth_service) {
|
||||
return service::get_storage_service().start(std::ref(db), std::ref(auth_service));
|
||||
}
|
||||
|
||||
inline future<> deinit_storage_service() {
|
||||
|
||||
@@ -34,10 +34,11 @@
|
||||
#include "tests/cql_test_env.hh"
|
||||
#include "tests/cql_assertions.hh"
|
||||
|
||||
#include "auth/auth.hh"
|
||||
#include "auth/allow_all_authenticator.hh"
|
||||
#include "auth/data_resource.hh"
|
||||
#include "auth/authenticator.hh"
|
||||
#include "auth/password_authenticator.hh"
|
||||
#include "auth/service.hh"
|
||||
#include "auth/authenticated_user.hh"
|
||||
|
||||
#include "db/config.hh"
|
||||
@@ -73,9 +74,10 @@ SEASTAR_TEST_CASE(test_data_resource) {
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_default_authenticator) {
|
||||
return do_with_cql_env([](cql_test_env&) {
|
||||
BOOST_REQUIRE_EQUAL(auth::authenticator::get().require_authentication(), false);
|
||||
BOOST_REQUIRE_EQUAL(auth::authenticator::get().qualified_java_name(), auth::allow_all_authenticator_name());
|
||||
return do_with_cql_env([](cql_test_env& env) {
|
||||
auto& a = env.local_auth_service().underlying_authenticator();
|
||||
BOOST_REQUIRE_EQUAL(a.require_authentication(), false);
|
||||
BOOST_REQUIRE_EQUAL(a.qualified_java_name(), auth::allow_all_authenticator_name());
|
||||
return make_ready_future();
|
||||
});
|
||||
}
|
||||
@@ -84,9 +86,10 @@ SEASTAR_TEST_CASE(test_password_authenticator_attributes) {
|
||||
db::config cfg;
|
||||
cfg.authenticator(auth::password_authenticator_name());
|
||||
|
||||
return do_with_cql_env([](cql_test_env&) {
|
||||
BOOST_REQUIRE_EQUAL(auth::authenticator::get().require_authentication(), true);
|
||||
BOOST_REQUIRE_EQUAL(auth::authenticator::get().qualified_java_name(), auth::password_authenticator_name());
|
||||
return do_with_cql_env([](cql_test_env& env) {
|
||||
auto& a = env.local_auth_service().underlying_authenticator();
|
||||
BOOST_REQUIRE_EQUAL(a.require_authentication(), true);
|
||||
BOOST_REQUIRE_EQUAL(a.qualified_java_name(), auth::password_authenticator_name());
|
||||
return make_ready_future();
|
||||
}, cfg);
|
||||
}
|
||||
@@ -95,20 +98,22 @@ SEASTAR_TEST_CASE(test_auth_users) {
|
||||
db::config cfg;
|
||||
cfg.authenticator(auth::password_authenticator_name());
|
||||
|
||||
return do_with_cql_env([](cql_test_env&) {
|
||||
return seastar::async([] {
|
||||
return do_with_cql_env([](cql_test_env& env) {
|
||||
return seastar::async([&env] {
|
||||
auto& auth = env.local_auth_service();
|
||||
|
||||
sstring username("fisk");
|
||||
auth::auth::insert_user(username, false).get();
|
||||
BOOST_REQUIRE_EQUAL(auth::auth::is_existing_user(username).get0(), true);
|
||||
BOOST_REQUIRE_EQUAL(auth::auth::is_super_user(username).get0(), false);
|
||||
auth.insert_user(username, false).get();
|
||||
BOOST_REQUIRE_EQUAL(auth.is_existing_user(username).get0(), true);
|
||||
BOOST_REQUIRE_EQUAL(auth.is_super_user(username).get0(), false);
|
||||
|
||||
auth::auth::insert_user(username, true).get();
|
||||
BOOST_REQUIRE_EQUAL(auth::auth::is_existing_user(username).get0(), true);
|
||||
BOOST_REQUIRE_EQUAL(auth::auth::is_super_user(username).get0(), true);
|
||||
auth.insert_user(username, true).get();
|
||||
BOOST_REQUIRE_EQUAL(auth.is_existing_user(username).get0(), true);
|
||||
BOOST_REQUIRE_EQUAL(auth.is_super_user(username).get0(), true);
|
||||
|
||||
auth::auth::delete_user(username).get();
|
||||
BOOST_REQUIRE_EQUAL(auth::auth::is_existing_user(username).get0(), false);
|
||||
BOOST_REQUIRE_EQUAL(auth::auth::is_super_user(username).get0(), false);
|
||||
auth.delete_user(username).get();
|
||||
BOOST_REQUIRE_EQUAL(auth.is_existing_user(username).get0(), false);
|
||||
BOOST_REQUIRE_EQUAL(auth.is_super_user(username).get0(), false);
|
||||
});
|
||||
}, cfg);
|
||||
}
|
||||
@@ -121,7 +126,7 @@ SEASTAR_TEST_CASE(test_password_authenticator_operations) {
|
||||
* Not using seastar::async due to apparent ASan bug.
|
||||
* Enjoy the slightly less readable code.
|
||||
*/
|
||||
return do_with_cql_env([](cql_test_env&) {
|
||||
return do_with_cql_env([](cql_test_env& env) {
|
||||
sstring username("fisk");
|
||||
sstring password("notter");
|
||||
|
||||
@@ -132,25 +137,26 @@ SEASTAR_TEST_CASE(test_password_authenticator_operations) {
|
||||
auto USERNAME_KEY = authenticator::USERNAME_KEY;
|
||||
auto PASSWORD_KEY = authenticator::PASSWORD_KEY;
|
||||
|
||||
auto& a = env.local_auth_service().underlying_authenticator();
|
||||
|
||||
// check non-existing user
|
||||
return authenticator::get().authenticate({ { USERNAME_KEY, username }, { PASSWORD_KEY, password } }).then_wrapped([](future<user_ptr>&& f) {
|
||||
return a.authenticate({ { USERNAME_KEY, username }, { PASSWORD_KEY, password } }).then_wrapped([&a](future<user_ptr>&& f) {
|
||||
try {
|
||||
f.get();
|
||||
BOOST_FAIL("should not reach");
|
||||
} catch (exceptions::authentication_exception&) {
|
||||
// ok
|
||||
}
|
||||
}).then([=] {
|
||||
return authenticator::get().create(username, { { option::PASSWORD, password} }).then([=] {
|
||||
return authenticator::get().authenticate({ { USERNAME_KEY, username }, { PASSWORD_KEY, password } }).then([=](user_ptr user) {
|
||||
}).then([=, &a] {
|
||||
return a.create(username, { { option::PASSWORD, password} }).then([=, &a] {
|
||||
return a.authenticate({ { USERNAME_KEY, username }, { PASSWORD_KEY, password } }).then([=](user_ptr user) {
|
||||
BOOST_REQUIRE_EQUAL(user->name(), username);
|
||||
BOOST_REQUIRE_EQUAL(user->is_anonymous(), false);
|
||||
});
|
||||
});
|
||||
}).then([=] {
|
||||
}).then([=, &a] {
|
||||
// check wrong password
|
||||
return authenticator::get().authenticate( { {USERNAME_KEY, username}, {PASSWORD_KEY, "hejkotte"}}).then_wrapped([](future<user_ptr>&& f) {
|
||||
return a.authenticate( { {USERNAME_KEY, username}, {PASSWORD_KEY, "hejkotte"}}).then_wrapped([](future<user_ptr>&& f) {
|
||||
try {
|
||||
f.get();
|
||||
BOOST_FAIL("should not reach");
|
||||
@@ -158,9 +164,9 @@ SEASTAR_TEST_CASE(test_password_authenticator_operations) {
|
||||
// ok
|
||||
}
|
||||
});
|
||||
}).then([=] {
|
||||
}).then([=, &a] {
|
||||
// sasl
|
||||
auto sasl = authenticator::get().new_sasl_challenge();
|
||||
auto sasl = a.new_sasl_challenge();
|
||||
|
||||
BOOST_REQUIRE_EQUAL(sasl->is_complete(), false);
|
||||
|
||||
@@ -178,10 +184,10 @@ SEASTAR_TEST_CASE(test_password_authenticator_operations) {
|
||||
BOOST_REQUIRE_EQUAL(user->name(), username);
|
||||
BOOST_REQUIRE_EQUAL(user->is_anonymous(), false);
|
||||
});
|
||||
}).then([=] {
|
||||
}).then([=, &a] {
|
||||
// check deleted user
|
||||
return authenticator::get().drop(username).then([=] {
|
||||
return authenticator::get().authenticate({ { USERNAME_KEY, username }, { PASSWORD_KEY, password } }).then_wrapped([](future<user_ptr>&& f) {
|
||||
return a.drop(username).then([=, &a] {
|
||||
return a.authenticate({ { USERNAME_KEY, username }, { PASSWORD_KEY, password } }).then_wrapped([](future<user_ptr>&& f) {
|
||||
try {
|
||||
f.get();
|
||||
BOOST_FAIL("should not reach");
|
||||
@@ -215,8 +221,8 @@ SEASTAR_TEST_CASE(test_cassandra_hash) {
|
||||
auto f = env.local_qp().process("INSERT into system_auth.credentials (username, salted_hash) values (?, ?)", db::consistency_level::ONE,
|
||||
{ username, salted_hash }).discard_result();
|
||||
|
||||
return f.then([=] {
|
||||
auto& a = auth::authenticator::get();
|
||||
return f.then([=, &env] {
|
||||
auto& a = env.local_auth_service().underlying_authenticator();
|
||||
|
||||
auto USERNAME_KEY = auth::authenticator::USERNAME_KEY;
|
||||
auto PASSWORD_KEY = auth::authenticator::PASSWORD_KEY;
|
||||
|
||||
@@ -43,7 +43,7 @@
|
||||
#include "gms/failure_detector.hh"
|
||||
#include "gms/gossiper.hh"
|
||||
#include "service/storage_service.hh"
|
||||
#include "auth/auth.hh"
|
||||
#include "auth/service.hh"
|
||||
|
||||
namespace sstables {
|
||||
|
||||
@@ -82,13 +82,14 @@ public:
|
||||
static std::atomic<bool> active;
|
||||
private:
|
||||
::shared_ptr<distributed<database>> _db;
|
||||
::shared_ptr<sharded<auth::service>> _auth_service;
|
||||
lw_shared_ptr<tmpdir> _data_dir;
|
||||
private:
|
||||
struct core_local_state {
|
||||
service::client_state client_state;
|
||||
|
||||
core_local_state()
|
||||
: client_state(service::client_state::for_external_calls())
|
||||
core_local_state(auth::service& auth_service)
|
||||
: client_state(service::client_state::for_external_calls(auth_service))
|
||||
{
|
||||
client_state.set_login(::make_shared<auth::authenticated_user>("cassandra"));
|
||||
}
|
||||
@@ -106,7 +107,7 @@ private:
|
||||
return ::make_shared<service::query_state>(_core_local.local().client_state);
|
||||
}
|
||||
public:
|
||||
single_node_cql_env(::shared_ptr<distributed<database>> db) : _db(db)
|
||||
single_node_cql_env(::shared_ptr<distributed<database>> db, ::shared_ptr<sharded<auth::service>> auth_service) : _db(db), _auth_service(std::move(auth_service))
|
||||
{ }
|
||||
|
||||
virtual future<::shared_ptr<cql_transport::messages::result_message>> execute_cql(const sstring& text) override {
|
||||
@@ -241,8 +242,12 @@ public:
|
||||
return cql3::get_query_processor();
|
||||
}
|
||||
|
||||
auth::service& local_auth_service() override {
|
||||
return _auth_service->local();
|
||||
}
|
||||
|
||||
future<> start() {
|
||||
return _core_local.start();
|
||||
return _core_local.start(std::ref(*_auth_service));
|
||||
}
|
||||
|
||||
future<> stop() {
|
||||
@@ -292,8 +297,10 @@ public:
|
||||
ms.start(listen, std::move(7000)).get();
|
||||
auto stop_ms = defer([&ms] { ms.stop().get(); });
|
||||
|
||||
auto auth_service = ::make_shared<sharded<auth::service>>();
|
||||
|
||||
auto& ss = service::get_storage_service();
|
||||
ss.start(std::ref(*db)).get();
|
||||
ss.start(std::ref(*db), std::ref(*auth_service)).get();
|
||||
auto stop_storage_service = defer([&ss] { ss.stop().get(); });
|
||||
|
||||
db->start(std::move(*cfg)).get();
|
||||
@@ -347,12 +354,12 @@ public:
|
||||
auto stop_local_cache = defer([] { db::system_keyspace::deinit_local_cache().get(); });
|
||||
|
||||
service::get_local_storage_service().init_server().get();
|
||||
auto deinit_storage_service_server = defer([] {
|
||||
auto deinit_storage_service_server = defer([auth_service] {
|
||||
gms::stop_gossiping().get();
|
||||
auth::auth::shutdown().get();
|
||||
auth_service->stop().get();
|
||||
});
|
||||
|
||||
single_node_cql_env env(db);
|
||||
single_node_cql_env env(db, auth_service);
|
||||
env.start().get();
|
||||
auto stop_env = defer([&env] { env.stop().get(); });
|
||||
|
||||
|
||||
@@ -38,6 +38,10 @@
|
||||
|
||||
class database;
|
||||
|
||||
namespace auth {
|
||||
class service;
|
||||
}
|
||||
|
||||
namespace cql3 {
|
||||
class query_processor;
|
||||
}
|
||||
@@ -87,6 +91,8 @@ public:
|
||||
virtual distributed<database>& db() = 0;
|
||||
|
||||
virtual distributed<cql3::query_processor> & qp() = 0;
|
||||
|
||||
virtual auth::service& local_auth_service() = 0;
|
||||
};
|
||||
|
||||
future<> do_with_cql_env(std::function<future<>(cql_test_env&)> func);
|
||||
|
||||
@@ -61,19 +61,20 @@ namespace bpo = boost::program_options;
|
||||
|
||||
int main(int ac, char ** av) {
|
||||
distributed<database> db;
|
||||
sharded<auth::service> auth_service;
|
||||
app_template app;
|
||||
app.add_options()
|
||||
("seed", bpo::value<std::vector<std::string>>(), "IP address of seed node")
|
||||
("listen-address", bpo::value<std::string>()->default_value("0.0.0.0"), "IP address to listen");
|
||||
return app.run_deprecated(ac, av, [&db, &app] {
|
||||
return app.run_deprecated(ac, av, [&auth_service, &db, &app] {
|
||||
auto config = app.configuration();
|
||||
logging::logger_registry().set_logger_level("gossip", logging::log_level::trace);
|
||||
const gms::inet_address listen = gms::inet_address(config["listen-address"].as<std::string>());
|
||||
utils::fb_utilities::set_broadcast_address(listen);
|
||||
utils::fb_utilities::set_broadcast_rpc_address(listen);
|
||||
auto vv = std::make_shared<gms::versioned_value::factory>();
|
||||
locator::i_endpoint_snitch::create_snitch("SimpleSnitch").then([&db] {
|
||||
return service::init_storage_service(db);
|
||||
locator::i_endpoint_snitch::create_snitch("SimpleSnitch").then([&auth_service, &db] {
|
||||
return service::init_storage_service(db, auth_service);
|
||||
}).then([vv, listen, config] {
|
||||
return netw::get_messaging_service().start(listen);
|
||||
}).then([config] {
|
||||
|
||||
@@ -40,9 +40,10 @@ thread_local disk_error_signal_type general_disk_error;
|
||||
SEASTAR_TEST_CASE(test_boot_shutdown){
|
||||
return seastar::async([] {
|
||||
distributed<database> db;
|
||||
sharded<auth::service> auth_service;
|
||||
utils::fb_utilities::set_broadcast_address(gms::inet_address("127.0.0.1"));
|
||||
locator::i_endpoint_snitch::create_snitch("SimpleSnitch").get();
|
||||
service::get_storage_service().start(std::ref(db)).get();
|
||||
service::get_storage_service().start(std::ref(db), std::ref(auth_service)).get();
|
||||
db.start().get();
|
||||
netw::get_messaging_service().start(gms::inet_address("127.0.0.1")).get();
|
||||
gms::get_failure_detector().start().get();
|
||||
|
||||
@@ -24,17 +24,19 @@
|
||||
|
||||
#include <seastar/core/distributed.hh>
|
||||
#include <seastar/core/thread.hh>
|
||||
#include "auth/service.hh"
|
||||
#include "service/storage_service.hh"
|
||||
#include "message/messaging_service.hh"
|
||||
|
||||
class storage_service_for_tests {
|
||||
distributed<database> _db;
|
||||
sharded<auth::service> _auth_service;
|
||||
public:
|
||||
storage_service_for_tests() {
|
||||
auto thread = seastar::thread_impl::get();
|
||||
assert(thread);
|
||||
netw::get_messaging_service().start(gms::inet_address("127.0.0.1")).get();
|
||||
service::get_storage_service().start(std::ref(_db)).get();
|
||||
service::get_storage_service().start(std::ref(_db), std::ref(_auth_service)).get();
|
||||
service::get_storage_service().invoke_on_all([] (auto& ss) {
|
||||
ss.enable_all_features();
|
||||
}).get();
|
||||
|
||||
@@ -198,10 +198,10 @@ private:
|
||||
});
|
||||
}
|
||||
public:
|
||||
explicit thrift_handler(distributed<database>& db, distributed<cql3::query_processor>& qp)
|
||||
explicit thrift_handler(distributed<database>& db, distributed<cql3::query_processor>& qp, auth::service& auth_service)
|
||||
: _db(db)
|
||||
, _query_processor(qp)
|
||||
, _query_state(service::client_state::for_external_thrift_calls())
|
||||
, _query_state(service::client_state::for_external_thrift_calls(auth_service))
|
||||
{ }
|
||||
|
||||
const sstring& current_keyspace() const {
|
||||
@@ -215,7 +215,8 @@ public:
|
||||
void login(tcxx::function<void()> cob, tcxx::function<void(::apache::thrift::TDelayedException* _throw)> exn_cob, const AuthenticationRequest& auth_request) {
|
||||
with_cob(std::move(cob), std::move(exn_cob), [&] {
|
||||
auth::authenticator::credentials_map creds(auth_request.credentials.begin(), auth_request.credentials.end());
|
||||
return auth::authenticator::get().authenticate(creds).then([this] (auto user) {
|
||||
auto& auth_service = *_query_state.get_client_state().get_auth_service();
|
||||
return auth_service.underlying_authenticator().authenticate(creds).then([this] (auto user) {
|
||||
_query_state.get_client_state().set_login(std::move(user));
|
||||
});
|
||||
});
|
||||
@@ -1902,13 +1903,15 @@ private:
|
||||
class handler_factory : public CassandraCobSvIfFactory {
|
||||
distributed<database>& _db;
|
||||
distributed<cql3::query_processor>& _query_processor;
|
||||
auth::service& _auth_service;
|
||||
public:
|
||||
explicit handler_factory(distributed<database>& db,
|
||||
distributed<cql3::query_processor>& qp)
|
||||
: _db(db), _query_processor(qp) {}
|
||||
distributed<cql3::query_processor>& qp,
|
||||
auth::service& auth_service)
|
||||
: _db(db), _query_processor(qp), _auth_service(auth_service) {}
|
||||
typedef CassandraCobSvIf Handler;
|
||||
virtual CassandraCobSvIf* getHandler(const ::apache::thrift::TConnectionInfo& connInfo) {
|
||||
return new thrift_handler(_db, _query_processor);
|
||||
return new thrift_handler(_db, _query_processor, _auth_service);
|
||||
}
|
||||
virtual void releaseHandler(CassandraCobSvIf* handler) {
|
||||
delete handler;
|
||||
@@ -1916,6 +1919,6 @@ public:
|
||||
};
|
||||
|
||||
std::unique_ptr<CassandraCobSvIfFactory>
|
||||
create_handler_factory(distributed<database>& db, distributed<cql3::query_processor>& qp) {
|
||||
return std::make_unique<handler_factory>(db, qp);
|
||||
create_handler_factory(distributed<database>& db, distributed<cql3::query_processor>& qp, auth::service& auth_service) {
|
||||
return std::make_unique<handler_factory>(db, qp, auth_service);
|
||||
}
|
||||
|
||||
@@ -23,11 +23,12 @@
|
||||
#define APPS_SEASTAR_THRIFT_HANDLER_HH_
|
||||
|
||||
#include "Cassandra.h"
|
||||
#include "auth/service.hh"
|
||||
#include "database.hh"
|
||||
#include "core/distributed.hh"
|
||||
#include "cql3/query_processor.hh"
|
||||
#include <memory>
|
||||
|
||||
std::unique_ptr<::cassandra::CassandraCobSvIfFactory> create_handler_factory(distributed<database>& db, distributed<cql3::query_processor>& qp);
|
||||
std::unique_ptr<::cassandra::CassandraCobSvIfFactory> create_handler_factory(distributed<database>& db, distributed<cql3::query_processor>& qp, auth::service&);
|
||||
|
||||
#endif /* APPS_SEASTAR_THRIFT_HANDLER_HH_ */
|
||||
|
||||
@@ -59,9 +59,10 @@ public:
|
||||
};
|
||||
|
||||
thrift_server::thrift_server(distributed<database>& db,
|
||||
distributed<cql3::query_processor>& qp)
|
||||
distributed<cql3::query_processor>& qp,
|
||||
auth::service& auth_service)
|
||||
: _stats(new thrift_stats(*this))
|
||||
, _handler_factory(create_handler_factory(db, qp).release())
|
||||
, _handler_factory(create_handler_factory(db, qp, auth_service).release())
|
||||
, _protocol_factory(new TBinaryProtocolFactoryT<TMemoryBuffer>())
|
||||
, _processor_factory(new CassandraAsyncProcessorFactory(_handler_factory)) {
|
||||
}
|
||||
|
||||
@@ -62,6 +62,10 @@ class TMemoryBuffer;
|
||||
|
||||
}}}
|
||||
|
||||
namespace auth {
|
||||
class service;
|
||||
}
|
||||
|
||||
class thrift_server {
|
||||
class connection : public boost::intrusive::list_base_hook<> {
|
||||
struct fake_transport;
|
||||
@@ -100,7 +104,7 @@ private:
|
||||
boost::intrusive::list<connection> _connections_list;
|
||||
seastar::gate _stop_gate;
|
||||
public:
|
||||
thrift_server(distributed<database>& db, distributed<cql3::query_processor>& qp);
|
||||
thrift_server(distributed<database>& db, distributed<cql3::query_processor>& qp, auth::service&);
|
||||
~thrift_server();
|
||||
future<> listen(ipv4_addr addr, bool keepalive);
|
||||
future<> stop();
|
||||
|
||||
@@ -260,13 +260,14 @@ private:
|
||||
}
|
||||
};
|
||||
|
||||
cql_server::cql_server(distributed<service::storage_proxy>& proxy, distributed<cql3::query_processor>& qp, cql_load_balance lb)
|
||||
cql_server::cql_server(distributed<service::storage_proxy>& proxy, distributed<cql3::query_processor>& qp, cql_load_balance lb, auth::service& auth_service)
|
||||
: _proxy(proxy)
|
||||
, _query_processor(qp)
|
||||
, _max_request_size(memory::stats().total_memory() / 10)
|
||||
, _memory_available(_max_request_size)
|
||||
, _notifier(std::make_unique<event_notifier>())
|
||||
, _lb(lb)
|
||||
, _auth_service(auth_service)
|
||||
{
|
||||
namespace sm = seastar::metrics;
|
||||
|
||||
@@ -557,7 +558,7 @@ cql_server::connection::connection(cql_server& server, ipv4_addr server_addr, co
|
||||
, _fd(std::move(fd))
|
||||
, _read_buf(_fd.input())
|
||||
, _write_buf(_fd.output())
|
||||
, _client_state(service::client_state::external_tag{}, addr)
|
||||
, _client_state(service::client_state::external_tag{}, server._auth_service, addr)
|
||||
{
|
||||
++_server._total_connections;
|
||||
++_server._current_connections;
|
||||
@@ -748,7 +749,7 @@ future<response_type> cql_server::connection::process_startup(uint16_t stream, b
|
||||
throw exceptions::protocol_exception(sprint("Unknown compression algorithm: %s", compression));
|
||||
}
|
||||
}
|
||||
auto& a = auth::authenticator::get();
|
||||
auto& a = client_state.get_auth_service()->underlying_authenticator();
|
||||
if (a.require_authentication()) {
|
||||
return make_ready_future<response_type>(std::make_pair(make_autheticate(stream, a.qualified_java_name(), client_state.get_trace_state()), client_state));
|
||||
}
|
||||
@@ -758,7 +759,7 @@ future<response_type> cql_server::connection::process_startup(uint16_t stream, b
|
||||
future<response_type> cql_server::connection::process_auth_response(uint16_t stream, bytes_view buf, service::client_state client_state)
|
||||
{
|
||||
if (_sasl_challenge == nullptr) {
|
||||
_sasl_challenge = auth::authenticator::get().new_sasl_challenge();
|
||||
_sasl_challenge = client_state.get_auth_service()->underlying_authenticator().new_sasl_challenge();
|
||||
}
|
||||
|
||||
auto challenge = _sasl_challenge->evaluate_response(buf);
|
||||
|
||||
@@ -21,6 +21,7 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "auth/service.hh"
|
||||
#include "core/reactor.hh"
|
||||
#include "service/endpoint_lifecycle_subscriber.hh"
|
||||
#include "service/migration_listener.hh"
|
||||
@@ -118,8 +119,9 @@ private:
|
||||
uint64_t _unpaged_queries = 0;
|
||||
uint64_t _requests_serving = 0;
|
||||
cql_load_balance _lb;
|
||||
auth::service& _auth_service;
|
||||
public:
|
||||
cql_server(distributed<service::storage_proxy>& proxy, distributed<cql3::query_processor>& qp, cql_load_balance lb);
|
||||
cql_server(distributed<service::storage_proxy>& proxy, distributed<cql3::query_processor>& qp, cql_load_balance lb, auth::service&);
|
||||
future<> listen(ipv4_addr addr, std::shared_ptr<seastar::tls::credentials_builder> = {}, bool keepalive = false);
|
||||
future<> do_accepts(int which, bool keepalive, ipv4_addr server_addr);
|
||||
future<> stop();
|
||||
|
||||
Reference in New Issue
Block a user