Merge "Free auth and its use from global variables" from Jesse

"This patch series addresses #2929. The objective is to eliminate global
state from the implementation and use of all access-control functionlity.

I've made every effort to make these patches logically independent and
incremental, but the final patch is big: this was necessary because
eliminating the global instances themselves is an atomic change."

* 'jhk/non_global_auth/v2' of https://github.com/hakuch/scylla:
  auth: Switch to sharded service
  tracing/trace_keyspace_helper: Use internal `client_state`
  auth: Make the QP an explicit dependency
  auth: Unify Java class name attributes
  auth: Make life-time control more consistent
  auth: Move metadata constants
  auth: Don't expose internal constant
  auth: Extract `permissions_cache`
  utils/loading_cache: Include necessary dependency
  auth: Fix static constant initialization
  auth: Extract `delayed_tasks` from `auth.cc`
This commit is contained in:
Avi Kivity
2017-11-16 14:52:34 +02:00
56 changed files with 1700 additions and 881 deletions

View File

@@ -0,0 +1,41 @@
/*
* Copyright (C) 2017 ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#include "auth/allow_all_authenticator.hh"
#include "service/migration_manager.hh"
#include "utils/class_registrator.hh"
namespace auth {
const sstring& allow_all_authenticator_name() {
static const sstring name = meta::AUTH_PACKAGE_NAME + "AllowAllAuthenticator";
return name;
}
// To ensure correct initialization order, we unfortunately need to use a string literal.
static const class_registrator<
authenticator,
allow_all_authenticator,
cql3::query_processor&,
::service::migration_manager&> registration("org.apache.cassandra.auth.AllowAllAuthenticator");
}

View File

@@ -0,0 +1,97 @@
/*
* Copyright (C) 2017 ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include <stdexcept>
#include "auth/authenticator.hh"
#include "auth/authenticated_user.hh"
#include "auth/common.hh"
namespace cql3 {
class query_processor;
}
namespace service {
class migration_manager;
}
namespace auth {
const sstring& allow_all_authenticator_name();
class allow_all_authenticator final : public authenticator {
public:
allow_all_authenticator(cql3::query_processor&, ::service::migration_manager&) {
}
future<> start() override {
return make_ready_future<>();
}
future<> stop() override {
return make_ready_future<>();
}
const sstring& qualified_java_name() const override {
return allow_all_authenticator_name();
}
bool require_authentication() const override {
return false;
}
option_set supported_options() const override {
return option_set();
}
option_set alterable_options() const override {
return option_set();
}
future<::shared_ptr<authenticated_user>> authenticate(const credentials_map& credentials) const override {
return make_ready_future<::shared_ptr<authenticated_user>>(::make_shared<authenticated_user>());
}
future<> create(sstring username, const option_map& options) override {
return make_ready_future();
}
future<> alter(sstring username, const option_map& options) override {
return make_ready_future();
}
future<> drop(sstring username) override {
return make_ready_future();
}
const resource_ids& protected_resources() const override {
static const resource_ids ids;
return ids;
}
::shared_ptr<sasl_challenge> new_sasl_challenge() const override {
throw std::runtime_error("Should not reach");
}
};
}

View File

@@ -0,0 +1,41 @@
/*
* Copyright (C) 2017 ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#include "auth/allow_all_authorizer.hh"
#include "auth/common.hh"
#include "utils/class_registrator.hh"
namespace auth {
const sstring& allow_all_authorizer_name() {
static const sstring name = meta::AUTH_PACKAGE_NAME + "AllowAllAuthorizer";
return name;
}
// To ensure correct initialization order, we unfortunately need to use a string literal.
static const class_registrator<
authorizer,
allow_all_authorizer,
cql3::query_processor&,
::service::migration_manager&> registration("org.apache.cassandra.auth.AllowAllAuthorizer");
}

View File

@@ -0,0 +1,98 @@
/*
* Copyright (C) 2017 ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include "authorizer.hh"
#include "exceptions/exceptions.hh"
#include "stdx.hh"
namespace cql3 {
class query_processor;
}
namespace service {
class migration_manager;
}
namespace auth {
class service;
const sstring& allow_all_authorizer_name();
class allow_all_authorizer final : public authorizer {
public:
allow_all_authorizer(cql3::query_processor&, ::service::migration_manager&) {
}
future<> start() override {
return make_ready_future<>();
}
future<> stop() override {
return make_ready_future<>();
}
const sstring& qualified_java_name() const override {
return allow_all_authorizer_name();
}
future<permission_set> authorize(service&, ::shared_ptr<authenticated_user>, data_resource) const override {
return make_ready_future<permission_set>(permissions::ALL);
}
future<> grant(::shared_ptr<authenticated_user>, permission_set, data_resource, sstring) override {
throw exceptions::invalid_request_exception("GRANT operation is not supported by AllowAllAuthorizer");
}
future<> revoke(::shared_ptr<authenticated_user>, permission_set, data_resource, sstring) override {
throw exceptions::invalid_request_exception("REVOKE operation is not supported by AllowAllAuthorizer");
}
future<std::vector<permission_details>> list(
service&,
::shared_ptr<authenticated_user> performer,
permission_set,
stdx::optional<data_resource>,
stdx::optional<sstring>) const override {
throw exceptions::invalid_request_exception("LIST PERMISSIONS operation is not supported by AllowAllAuthorizer");
}
future<> revoke_all(sstring dropped_user) override {
return make_ready_future();
}
future<> revoke_all(data_resource) override {
return make_ready_future();
}
const resource_ids& protected_resources() override {
static const resource_ids ids;
return ids;
}
future<> validate_configuration() const override {
return make_ready_future();
}
};
}

View File

@@ -1,377 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* Copyright (C) 2016 ScyllaDB
*
* Modified by ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#include <seastar/core/sleep.hh>
#include <seastar/core/distributed.hh>
#include "auth.hh"
#include "authenticator.hh"
#include "authorizer.hh"
#include "database.hh"
#include "cql3/query_processor.hh"
#include "cql3/untyped_result_set.hh"
#include "cql3/statements/raw/cf_statement.hh"
#include "cql3/statements/create_table_statement.hh"
#include "db/config.hh"
#include "service/migration_manager.hh"
#include "utils/loading_cache.hh"
#include "utils/hash.hh"
const sstring auth::auth::DEFAULT_SUPERUSER_NAME("cassandra");
const sstring auth::auth::AUTH_KS("system_auth");
const sstring auth::auth::USERS_CF("users");
const sstring auth::auth::AUTH_PACKAGE_NAME("org.apache.cassandra.auth.");
static const sstring USER_NAME("name");
static const sstring SUPER("super");
static logging::logger alogger("auth");
// TODO: configurable
using namespace std::chrono_literals;
const std::chrono::milliseconds auth::auth::SUPERUSER_SETUP_DELAY = 10000ms;
class auth_migration_listener : public service::migration_listener {
void on_create_keyspace(const sstring& ks_name) override {}
void on_create_column_family(const sstring& ks_name, const sstring& cf_name) override {}
void on_create_user_type(const sstring& ks_name, const sstring& type_name) override {}
void on_create_function(const sstring& ks_name, const sstring& function_name) override {}
void on_create_aggregate(const sstring& ks_name, const sstring& aggregate_name) override {}
void on_create_view(const sstring& ks_name, const sstring& view_name) override {}
void on_update_keyspace(const sstring& ks_name) override {}
void on_update_column_family(const sstring& ks_name, const sstring& cf_name, bool) override {}
void on_update_user_type(const sstring& ks_name, const sstring& type_name) override {}
void on_update_function(const sstring& ks_name, const sstring& function_name) override {}
void on_update_aggregate(const sstring& ks_name, const sstring& aggregate_name) override {}
void on_update_view(const sstring& ks_name, const sstring& view_name, bool columns_changed) override {}
void on_drop_keyspace(const sstring& ks_name) override {
auth::authorizer::get().revoke_all(auth::data_resource(ks_name));
}
void on_drop_column_family(const sstring& ks_name, const sstring& cf_name) override {
auth::authorizer::get().revoke_all(auth::data_resource(ks_name, cf_name));
}
void on_drop_user_type(const sstring& ks_name, const sstring& type_name) override {}
void on_drop_function(const sstring& ks_name, const sstring& function_name) override {}
void on_drop_aggregate(const sstring& ks_name, const sstring& aggregate_name) override {}
void on_drop_view(const sstring& ks_name, const sstring& view_name) override {}
};
static auth_migration_listener auth_migration;
namespace std {
template <>
struct hash<auth::data_resource> {
size_t operator()(const auth::data_resource & v) const {
return v.hash_value();
}
};
template <>
struct hash<auth::authenticated_user> {
size_t operator()(const auth::authenticated_user & v) const {
return utils::tuple_hash()(v.name(), v.is_anonymous());
}
};
}
class auth::auth::permissions_cache {
public:
typedef utils::loading_cache<std::pair<authenticated_user, data_resource>, permission_set, utils::loading_cache_reload_enabled::yes, utils::simple_entry_size<permission_set>, utils::tuple_hash> cache_type;
typedef typename cache_type::key_type key_type;
permissions_cache()
: permissions_cache(
cql3::get_local_query_processor().db().local().get_config()) {
}
permissions_cache(const db::config& cfg)
: _cache(cfg.permissions_cache_max_entries(), std::chrono::milliseconds(cfg.permissions_validity_in_ms()), std::chrono::milliseconds(cfg.permissions_update_interval_in_ms()), alogger,
[] (const key_type& k) {
alogger.debug("Refreshing permissions for {}", k.first.name());
return authorizer::get().authorize(::make_shared<authenticated_user>(k.first), k.second);
}) {}
future<> stop() {
return _cache.stop();
}
future<permission_set> get(::shared_ptr<authenticated_user> user, data_resource resource) {
return _cache.get(key_type(*user, std::move(resource)));
}
private:
cache_type _cache;
};
namespace std { // for ADL, yuch
std::ostream& operator<<(std::ostream& os, const std::pair<auth::authenticated_user, auth::data_resource>& p) {
os << "{user: " << p.first.name() << ", data_resource: " << p.second << "}";
return os;
}
}
static distributed<auth::auth::permissions_cache> perm_cache;
/**
* Poor mans job schedule. For maximum 2 jobs. Sic.
* Still does nothing more clever than waiting 10 seconds
* like origin, then runs the submitted tasks.
*
* Only difference compared to sleep (from which this
* borrows _heavily_) is that if tasks have not run by the time
* we exit (and do static clean up) we delete the promise + cont
*
* Should be abstracted to some sort of global server function
* probably.
*/
struct waiter {
promise<> done;
timer<> tmr;
waiter() : tmr([this] {done.set_value();})
{
tmr.arm(auth::auth::SUPERUSER_SETUP_DELAY);
}
~waiter() {
if (tmr.armed()) {
tmr.cancel();
done.set_exception(std::runtime_error("shutting down"));
}
alogger.trace("Deleting scheduled task");
}
void kill() {
}
};
typedef std::unique_ptr<waiter> waiter_ptr;
static std::vector<waiter_ptr> & thread_waiters() {
static thread_local std::vector<waiter_ptr> the_waiters;
return the_waiters;
}
void auth::auth::schedule_when_up(scheduled_func f) {
alogger.trace("Adding scheduled task");
auto & waiters = thread_waiters();
waiters.emplace_back(std::make_unique<waiter>());
auto* w = waiters.back().get();
w->done.get_future().finally([w] {
auto & waiters = thread_waiters();
auto i = std::find_if(waiters.begin(), waiters.end(), [w](const waiter_ptr& p) {
return p.get() == w;
});
if (i != waiters.end()) {
waiters.erase(i);
}
}).then([f = std::move(f)] {
alogger.trace("Running scheduled task");
return f();
}).handle_exception([](auto ep) {
return make_ready_future();
});
}
future<> auth::auth::setup() {
auto& db = cql3::get_local_query_processor().db().local();
auto& cfg = db.get_config();
future<> f = perm_cache.start();
qualified_name authenticator_name(AUTH_PACKAGE_NAME, cfg.authenticator()),
authorizer_name(AUTH_PACKAGE_NAME, cfg.authorizer());
if (authenticator::ALLOW_ALL_AUTHENTICATOR_NAME == authenticator_name && authorizer::ALLOW_ALL_AUTHORIZER_NAME == authorizer_name) {
// just create the objects
return f.then([authenticator_name = std::move(authenticator_name)] {
return authenticator::setup(authenticator_name);
}).then([authorizer_name = std::move(authorizer_name)] {
return authorizer::setup(authorizer_name);
});
}
if (!db.has_keyspace(AUTH_KS)) {
std::map<sstring, sstring> opts;
opts["replication_factor"] = "1";
auto ksm = keyspace_metadata::new_keyspace(AUTH_KS, "org.apache.cassandra.locator.SimpleStrategy", opts, true);
// We use min_timestamp so that default keyspace metadata will loose with any manual adjustments. See issue #2129.
f = service::get_local_migration_manager().announce_new_keyspace(ksm, api::min_timestamp, false);
}
return f.then([] {
return setup_table(USERS_CF, sprint("CREATE TABLE %s.%s (%s text, %s boolean, PRIMARY KEY(%s)) WITH gc_grace_seconds=%d",
AUTH_KS, USERS_CF, USER_NAME, SUPER, USER_NAME,
90 * 24 * 60 * 60)); // 3 months.
}).then([authenticator_name = std::move(authenticator_name)] {
return authenticator::setup(authenticator_name);
}).then([authorizer_name = std::move(authorizer_name)] {
return authorizer::setup(authorizer_name);
}).then([] {
service::get_local_migration_manager().register_listener(&auth_migration); // again, only one shard...
// instead of once-timer, just schedule this later
schedule_when_up([] {
// setup default super user
return has_existing_users(USERS_CF, DEFAULT_SUPERUSER_NAME, USER_NAME).then([](bool exists) {
if (!exists) {
auto query = sprint("INSERT INTO %s.%s (%s, %s) VALUES (?, ?) USING TIMESTAMP 0",
AUTH_KS, USERS_CF, USER_NAME, SUPER);
cql3::get_local_query_processor().process(query, db::consistency_level::ONE, {DEFAULT_SUPERUSER_NAME, true}).then([](auto) {
alogger.info("Created default superuser '{}'", DEFAULT_SUPERUSER_NAME);
}).handle_exception([](auto ep) {
try {
std::rethrow_exception(ep);
} catch (exceptions::request_execution_exception&) {
alogger.warn("Skipped default superuser setup: some nodes were not ready");
}
});
}
});
});
});
}
future<> auth::auth::shutdown() {
// just make sure we don't have pending tasks.
// this is mostly relevant for test cases where
// db-env-shutdown != process shutdown
return smp::invoke_on_all([] {
thread_waiters().clear();
}).then([] {
return perm_cache.stop();
});
}
future<auth::permission_set> auth::auth::get_permissions(::shared_ptr<authenticated_user> user, data_resource resource) {
return perm_cache.local().get(std::move(user), std::move(resource));
}
static db::consistency_level consistency_for_user(const sstring& username) {
if (username == auth::auth::DEFAULT_SUPERUSER_NAME) {
return db::consistency_level::QUORUM;
}
return db::consistency_level::LOCAL_ONE;
}
static future<::shared_ptr<cql3::untyped_result_set>> select_user(const sstring& username) {
// Here was a thread local, explicit cache of prepared statement. In normal execution this is
// fine, but since we in testing set up and tear down system over and over, we'd start using
// obsolete prepared statements pretty quickly.
// Rely on query processing caching statements instead, and lets assume
// that a map lookup string->statement is not gonna kill us much.
return cql3::get_local_query_processor().process(
sprint("SELECT * FROM %s.%s WHERE %s = ?",
auth::auth::AUTH_KS, auth::auth::USERS_CF,
USER_NAME), consistency_for_user(username),
{ username }, true);
}
future<bool> auth::auth::is_existing_user(const sstring& username) {
return select_user(username).then(
[](::shared_ptr<cql3::untyped_result_set> res) {
return make_ready_future<bool>(!res->empty());
});
}
future<bool> auth::auth::is_super_user(const sstring& username) {
return select_user(username).then(
[](::shared_ptr<cql3::untyped_result_set> res) {
return make_ready_future<bool>(!res->empty() && res->one().get_as<bool>(SUPER));
});
}
future<> auth::auth::insert_user(const sstring& username, bool is_super) {
return cql3::get_local_query_processor().process(sprint("INSERT INTO %s.%s (%s, %s) VALUES (?, ?)",
AUTH_KS, USERS_CF, USER_NAME, SUPER),
consistency_for_user(username), { username, is_super }).discard_result();
}
future<> auth::auth::delete_user(const sstring& username) {
return cql3::get_local_query_processor().process(sprint("DELETE FROM %s.%s WHERE %s = ?",
AUTH_KS, USERS_CF, USER_NAME),
consistency_for_user(username), { username }).discard_result();
}
future<> auth::auth::setup_table(const sstring& name, const sstring& cql) {
auto& qp = cql3::get_local_query_processor();
auto& db = qp.db().local();
if (db.has_schema(AUTH_KS, name)) {
return make_ready_future();
}
::shared_ptr<cql3::statements::raw::cf_statement> parsed = static_pointer_cast<
cql3::statements::raw::cf_statement>(cql3::query_processor::parse_statement(cql));
parsed->prepare_keyspace(AUTH_KS);
::shared_ptr<cql3::statements::create_table_statement> statement =
static_pointer_cast<cql3::statements::create_table_statement>(
parsed->prepare(db, qp.get_cql_stats())->statement);
auto schema = statement->get_cf_meta_data();
auto uuid = generate_legacy_id(schema->ks_name(), schema->cf_name());
schema_builder b(schema);
b.set_uuid(uuid);
return service::get_local_migration_manager().announce_new_column_family(b.build(), false);
}
future<bool> auth::auth::has_existing_users(const sstring& cfname, const sstring& def_user_name, const sstring& name_column) {
auto default_user_query = sprint("SELECT * FROM %s.%s WHERE %s = ?", AUTH_KS, cfname, name_column);
auto all_users_query = sprint("SELECT * FROM %s.%s LIMIT 1", AUTH_KS, cfname);
return cql3::get_local_query_processor().process(default_user_query, db::consistency_level::ONE, { def_user_name }).then([=](::shared_ptr<cql3::untyped_result_set> res) {
if (!res->empty()) {
return make_ready_future<bool>(true);
}
return cql3::get_local_query_processor().process(default_user_query, db::consistency_level::QUORUM, { def_user_name }).then([all_users_query](::shared_ptr<cql3::untyped_result_set> res) {
if (!res->empty()) {
return make_ready_future<bool>(true);
}
return cql3::get_local_query_processor().process(all_users_query, db::consistency_level::QUORUM).then([](::shared_ptr<cql3::untyped_result_set> res) {
return make_ready_future<bool>(!res->empty());
});
});
});
}

View File

@@ -1,124 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* Copyright (C) 2016 ScyllaDB
*
* Modified by ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include <chrono>
#include <seastar/core/sstring.hh>
#include <seastar/core/future.hh>
#include <seastar/core/shared_ptr.hh>
#include "exceptions/exceptions.hh"
#include "permission.hh"
#include "data_resource.hh"
#include "authenticated_user.hh"
namespace auth {
class auth {
public:
class permissions_cache;
static const sstring DEFAULT_SUPERUSER_NAME;
static const sstring AUTH_KS;
static const sstring USERS_CF;
static const sstring AUTH_PACKAGE_NAME;
static const std::chrono::milliseconds SUPERUSER_SETUP_DELAY;
static future<permission_set> get_permissions(::shared_ptr<authenticated_user>, data_resource);
/**
* Checks if the username is stored in AUTH_KS.USERS_CF.
*
* @param username Username to query.
* @return whether or not Cassandra knows about the user.
*/
static future<bool> is_existing_user(const sstring& username);
/**
* Checks if the user is a known superuser.
*
* @param username Username to query.
* @return true is the user is a superuser, false if they aren't or don't exist at all.
*/
static future<bool> is_super_user(const sstring& username);
/**
* Inserts the user into AUTH_KS.USERS_CF (or overwrites their superuser status as a result of an ALTER USER query).
*
* @param username Username to insert.
* @param isSuper User's new status.
* @throws RequestExecutionException
*/
static future<> insert_user(const sstring& username, bool is_super);
/**
* Deletes the user from AUTH_KS.USERS_CF.
*
* @param username Username to delete.
* @throws RequestExecutionException
*/
static future<> delete_user(const sstring& username);
/**
* Sets up Authenticator and Authorizer.
*/
static future<> setup();
static future<> shutdown();
/**
* Set up table from given CREATE TABLE statement under system_auth keyspace, if not already done so.
*
* @param name name of the table
* @param cql CREATE TABLE statement
*/
static future<> setup_table(const sstring& name, const sstring& cql);
static future<bool> has_existing_users(const sstring& cfname, const sstring& def_user_name, const sstring& name_column_name);
// For internal use. Run function "when system is up".
typedef std::function<future<>()> scheduled_func;
static void schedule_when_up(scheduled_func);
};
}
std::ostream& operator<<(std::ostream& os, const std::pair<auth::authenticated_user, auth::data_resource>& p);

View File

@@ -41,7 +41,6 @@
#include "authenticated_user.hh"
#include "auth.hh"
const sstring auth::authenticated_user::ANONYMOUS_USERNAME("anonymous");
@@ -60,13 +59,6 @@ const sstring& auth::authenticated_user::name() const {
return _anon ? ANONYMOUS_USERNAME : _name;
}
future<bool> auth::authenticated_user::is_super() const {
if (is_anonymous()) {
return make_ready_future<bool>(false);
}
return auth::auth::is_super_user(_name);
}
bool auth::authenticated_user::operator==(const authenticated_user& v) const {
return _anon ? v._anon : _name == v._name;
}

View File

@@ -58,14 +58,6 @@ public:
const sstring& name() const;
/**
* Checks the user's superuser status.
* Only a superuser is allowed to perform CREATE USER and DROP USER queries.
* Im most cased, though not necessarily, a superuser will have Permission.ALL on every resource
* (depends on IAuthorizer implementation).
*/
future<bool> is_super() const;
/**
* If IAuthenticator doesn't require authentication, this method may return true.
*/

View File

@@ -41,14 +41,14 @@
#include "authenticator.hh"
#include "authenticated_user.hh"
#include "common.hh"
#include "password_authenticator.hh"
#include "auth.hh"
#include "cql3/query_processor.hh"
#include "db/config.hh"
#include "utils/class_registrator.hh"
const sstring auth::authenticator::USERNAME_KEY("username");
const sstring auth::authenticator::PASSWORD_KEY("password");
const sstring auth::authenticator::ALLOW_ALL_AUTHENTICATOR_NAME(auth::AUTH_PACKAGE_NAME + "AllowAllAuthenticator");
auth::authenticator::option auth::authenticator::string_to_option(const sstring& name) {
if (strcasecmp(name.c_str(), "password") == 0) {
@@ -65,64 +65,3 @@ sstring auth::authenticator::option_to_string(option opt) {
throw std::invalid_argument(sprint("Unknown option {}", opt));
}
}
/**
* Authenticator is assumed to be a fully state-less immutable object (note all the const).
* We thus store a single instance globally, since it should be safe/ok.
*/
static std::unique_ptr<auth::authenticator> global_authenticator;
using authenticator_registry = class_registry<auth::authenticator>;
future<>
auth::authenticator::setup(const sstring& type) {
if (type == ALLOW_ALL_AUTHENTICATOR_NAME) {
class allow_all_authenticator : public authenticator {
public:
const sstring& class_name() const override {
return ALLOW_ALL_AUTHENTICATOR_NAME;
}
bool require_authentication() const override {
return false;
}
option_set supported_options() const override {
return option_set();
}
option_set alterable_options() const override {
return option_set();
}
future<::shared_ptr<authenticated_user>> authenticate(const credentials_map& credentials) const override {
return make_ready_future<::shared_ptr<authenticated_user>>(::make_shared<authenticated_user>());
}
future<> create(sstring username, const option_map& options) override {
return make_ready_future();
}
future<> alter(sstring username, const option_map& options) override {
return make_ready_future();
}
future<> drop(sstring username) override {
return make_ready_future();
}
const resource_ids& protected_resources() const override {
static const resource_ids ids;
return ids;
}
::shared_ptr<sasl_challenge> new_sasl_challenge() const override {
throw std::runtime_error("Should not reach");
}
};
global_authenticator = std::make_unique<allow_all_authenticator>();
return make_ready_future();
} else {
auto a = authenticator_registry::create(type);
auto f = a->init();
return f.then([a = std::move(a)]() mutable {
global_authenticator = std::move(a);
});
}
}
auth::authenticator& auth::authenticator::get() {
assert(global_authenticator);
return *global_authenticator;
}

View File

@@ -69,7 +69,6 @@ class authenticator {
public:
static const sstring USERNAME_KEY;
static const sstring PASSWORD_KEY;
static const sstring ALLOW_ALL_AUTHENTICATOR_NAME;
/**
* Supported CREATE USER/ALTER USER options.
@@ -86,27 +85,14 @@ public:
using option_map = std::unordered_map<option, boost::any, enum_hash<option>>;
using credentials_map = std::unordered_map<sstring, sstring>;
/**
* Setup is called once upon system startup to initialize the IAuthenticator.
*
* For example, use this method to create any required keyspaces/column families.
* Note: Only call from main thread.
*/
static future<> setup(const sstring& type);
/**
* Returns the system authenticator. Must have called setup before calling this.
*/
static authenticator& get();
virtual ~authenticator()
{}
virtual future<> init() {
return make_ready_future();
}
virtual future<> start() = 0;
virtual const sstring& class_name() const = 0;
virtual future<> stop() = 0;
virtual const sstring& qualified_java_name() const = 0;
/**
* Whether or not the authenticator requires explicit login.

View File

@@ -41,25 +41,39 @@
#include "authorizer.hh"
#include "authenticated_user.hh"
#include "common.hh"
#include "default_authorizer.hh"
#include "auth.hh"
#include "cql3/query_processor.hh"
#include "db/config.hh"
#include "utils/class_registrator.hh"
const sstring auth::authorizer::ALLOW_ALL_AUTHORIZER_NAME(auth::AUTH_PACKAGE_NAME + "AllowAllAuthorizer");
const sstring& auth::allow_all_authorizer_name() {
static const sstring name = meta::AUTH_PACKAGE_NAME + "AllowAllAuthorizer";
return name;
}
/**
* Authenticator is assumed to be a fully state-less immutable object (note all the const).
* We thus store a single instance globally, since it should be safe/ok.
*/
static std::unique_ptr<auth::authorizer> global_authorizer;
using authorizer_registry = class_registry<auth::authorizer>;
using authorizer_registry = class_registry<auth::authorizer, cql3::query_processor&>;
future<>
auth::authorizer::setup(const sstring& type) {
if (type == ALLOW_ALL_AUTHORIZER_NAME) {
if (type == allow_all_authorizer_name()) {
class allow_all_authorizer : public authorizer {
public:
future<> start() override {
return make_ready_future<>();
}
future<> stop() override {
return make_ready_future<>();
}
const sstring& qualified_java_name() const override {
return allow_all_authorizer_name();
}
future<permission_set> authorize(::shared_ptr<authenticated_user>, data_resource) const override {
return make_ready_future<permission_set>(permissions::ALL);
}
@@ -90,8 +104,8 @@ auth::authorizer::setup(const sstring& type) {
global_authorizer = std::make_unique<allow_all_authorizer>();
return make_ready_future();
} else {
auto a = authorizer_registry::create(type);
auto f = a->init();
auto a = authorizer_registry::create(type, cql3::get_local_query_processor());
auto f = a->start();
return f.then([a = std::move(a)]() mutable {
global_authorizer = std::move(a);
});

View File

@@ -55,6 +55,8 @@
namespace auth {
class service;
class authenticated_user;
struct permission_details {
@@ -71,13 +73,13 @@ using std::experimental::optional;
class authorizer {
public:
static const sstring ALLOW_ALL_AUTHORIZER_NAME;
virtual ~authorizer() {}
virtual future<> init() {
return make_ready_future();
}
virtual future<> start() = 0;
virtual future<> stop() = 0;
virtual const sstring& qualified_java_name() const = 0;
/**
* The primary Authorizer method. Returns a set of permissions of a user on a resource.
@@ -86,7 +88,7 @@ public:
* @param resource Resource for which the authorization is being requested. @see DataResource.
* @return Set of permissions of the user on the resource. Should never return empty. Use permission.NONE instead.
*/
virtual future<permission_set> authorize(::shared_ptr<authenticated_user>, data_resource) const = 0;
virtual future<permission_set> authorize(service&, ::shared_ptr<authenticated_user>, data_resource) const = 0;
/**
* Grants a set of permissions on a resource to a user.
@@ -130,7 +132,7 @@ public:
* @throws RequestValidationException
* @throws RequestExecutionException
*/
virtual future<std::vector<permission_details>> list(::shared_ptr<authenticated_user> performer, permission_set, optional<data_resource>, optional<sstring>) const = 0;
virtual future<std::vector<permission_details>> list(service&, ::shared_ptr<authenticated_user> performer, permission_set, optional<data_resource>, optional<sstring>) const = 0;
/**
* This method is called before deleting a user with DROP USER query so that a new user with the same
@@ -160,18 +162,6 @@ public:
* @throws ConfigurationException when there is a configuration error.
*/
virtual future<> validate_configuration() const = 0;
/**
* Setup is called once upon system startup to initialize the IAuthorizer.
*
* For example, use this method to create any required keyspaces/column families.
*/
static future<> setup(const sstring& type);
/**
* Returns the system authorizer. Must have called setup before calling this.
*/
static authorizer& get();
};
}

70
auth/common.cc Normal file
View File

@@ -0,0 +1,70 @@
/*
* Copyright (C) 2017 ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#include "auth/common.hh"
#include <seastar/core/shared_ptr.hh>
#include "cql3/query_processor.hh"
#include "cql3/statements/create_table_statement.hh"
#include "schema_builder.hh"
#include "service/migration_manager.hh"
namespace auth {
namespace meta {
const sstring DEFAULT_SUPERUSER_NAME("cassandra");
const sstring AUTH_KS("system_auth");
const sstring USERS_CF("users");
const sstring AUTH_PACKAGE_NAME("org.apache.cassandra.auth.");
}
future<> create_metadata_table_if_missing(
const sstring& table_name,
cql3::query_processor& qp,
const sstring& cql,
::service::migration_manager& mm) {
auto& db = qp.db().local();
if (db.has_schema(meta::AUTH_KS, table_name)) {
return make_ready_future<>();
}
auto parsed_statement = static_pointer_cast<cql3::statements::raw::cf_statement>(
cql3::query_processor::parse_statement(cql));
parsed_statement->prepare_keyspace(meta::AUTH_KS);
auto statement = static_pointer_cast<cql3::statements::create_table_statement>(
parsed_statement->prepare(db, qp.get_cql_stats())->statement);
const auto schema = statement->get_cf_meta_data();
const auto uuid = generate_legacy_id(schema->ks_name(), schema->cf_name());
schema_builder b(schema);
b.set_uuid(uuid);
return mm.announce_new_column_family(b.build(), false);
}
}

74
auth/common.hh Normal file
View File

@@ -0,0 +1,74 @@
/*
* Copyright (C) 2017 ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include <chrono>
#include <seastar/core/future.hh>
#include <seastar/core/reactor.hh>
#include <seastar/core/resource.hh>
#include <seastar/core/sstring.hh>
#include "delayed_tasks.hh"
#include "seastarx.hh"
namespace service {
class migration_manager;
}
namespace cql3 {
class query_processor;
}
namespace auth {
namespace meta {
extern const sstring DEFAULT_SUPERUSER_NAME;
extern const sstring AUTH_KS;
extern const sstring USERS_CF;
extern const sstring AUTH_PACKAGE_NAME;
}
template <class Task>
future<> once_among_shards(Task&& f) {
if (engine().cpu_id() == 0u) {
return f();
}
return make_ready_future<>();
}
template <class Task, class Clock>
void delay_until_system_ready(delayed_tasks<Clock>& ts, Task&& f) {
static const typename std::chrono::milliseconds delay_duration(10000);
ts.schedule_after(delay_duration, std::forward<Task>(f));
}
future<> create_metadata_table_if_missing(
const sstring& table_name,
cql3::query_processor&,
const sstring& cql,
::service::migration_manager&);
}

View File

@@ -46,7 +46,7 @@
#include <seastar/core/reactor.hh>
#include "auth.hh"
#include "common.hh"
#include "default_authorizer.hh"
#include "authenticated_user.hh"
#include "permission.hh"
@@ -55,8 +55,10 @@
#include "exceptions/exceptions.hh"
#include "log.hh"
const sstring auth::default_authorizer::DEFAULT_AUTHORIZER_NAME(
auth::AUTH_PACKAGE_NAME + "CassandraAuthorizer");
const sstring& auth::default_authorizer_name() {
static const sstring name = meta::AUTH_PACKAGE_NAME + "CassandraAuthorizer";
return name;
}
static const sstring USER_NAME = "username";
static const sstring RESOURCE_NAME = "resource";
@@ -64,31 +66,48 @@ static const sstring PERMISSIONS_NAME = "permissions";
static const sstring PERMISSIONS_CF = "permissions";
static logging::logger alogger("default_authorizer");
static const class_registrator<auth::authorizer, auth::default_authorizer> password_auth_reg(
auth::default_authorizer::DEFAULT_AUTHORIZER_NAME);
auth::default_authorizer::default_authorizer() {
// To ensure correct initialization order, we unfortunately need to use a string literal.
static const class_registrator<
auth::authorizer,
auth::default_authorizer,
cql3::query_processor&,
::service::migration_manager&> password_auth_reg("org.apache.cassandra.auth.CassandraAuthorizer");
auth::default_authorizer::default_authorizer(cql3::query_processor& qp, ::service::migration_manager& mm)
: _qp(qp)
, _migration_manager(mm) {
}
auth::default_authorizer::~default_authorizer() {
}
future<> auth::default_authorizer::init() {
sstring create_table = sprint("CREATE TABLE %s.%s ("
future<> auth::default_authorizer::start() {
static const sstring create_table = sprint("CREATE TABLE %s.%s ("
"%s text,"
"%s text,"
"%s set<text>,"
"PRIMARY KEY(%s, %s)"
") WITH gc_grace_seconds=%d", auth::auth::AUTH_KS,
") WITH gc_grace_seconds=%d", meta::AUTH_KS,
PERMISSIONS_CF, USER_NAME, RESOURCE_NAME, PERMISSIONS_NAME,
USER_NAME, RESOURCE_NAME, 90 * 24 * 60 * 60); // 3 months.
return auth::setup_table(PERMISSIONS_CF, create_table);
return auth::once_among_shards([this] {
return auth::create_metadata_table_if_missing(
PERMISSIONS_CF,
_qp,
create_table,
_migration_manager);
});
}
future<> auth::default_authorizer::stop() {
return make_ready_future<>();
}
future<auth::permission_set> auth::default_authorizer::authorize(
::shared_ptr<authenticated_user> user, data_resource resource) const {
return user->is_super().then([this, user, resource = std::move(resource)](bool is_super) {
service& ser, ::shared_ptr<authenticated_user> user, data_resource resource) const {
return auth::is_super_user(ser, *user).then([this, user, resource = std::move(resource)](bool is_super) {
if (is_super) {
return make_ready_future<permission_set>(permissions::ALL);
}
@@ -97,10 +116,9 @@ future<auth::permission_set> auth::default_authorizer::authorize(
* TOOD: could create actual data type for permission (translating string<->perm),
* but this seems overkill right now. We still must store strings so...
*/
auto& qp = cql3::get_local_query_processor();
auto query = sprint("SELECT %s FROM %s.%s WHERE %s = ? AND %s = ?"
, PERMISSIONS_NAME, auth::AUTH_KS, PERMISSIONS_CF, USER_NAME, RESOURCE_NAME);
return qp.process(query, db::consistency_level::LOCAL_ONE, {user->name(), resource.name() })
, PERMISSIONS_NAME, meta::AUTH_KS, PERMISSIONS_CF, USER_NAME, RESOURCE_NAME);
return _qp.process(query, db::consistency_level::LOCAL_ONE, {user->name(), resource.name() })
.then_wrapped([=](future<::shared_ptr<cql3::untyped_result_set>> f) {
try {
auto res = f.get0();
@@ -123,11 +141,10 @@ future<> auth::default_authorizer::modify(
::shared_ptr<authenticated_user> performer, permission_set set,
data_resource resource, sstring user, sstring op) {
// TODO: why does this not check super user?
auto& qp = cql3::get_local_query_processor();
auto query = sprint("UPDATE %s.%s SET %s = %s %s ? WHERE %s = ? AND %s = ?",
auth::AUTH_KS, PERMISSIONS_CF, PERMISSIONS_NAME,
meta::AUTH_KS, PERMISSIONS_CF, PERMISSIONS_NAME,
PERMISSIONS_NAME, op, USER_NAME, RESOURCE_NAME);
return qp.process(query, db::consistency_level::ONE, {
return _qp.process(query, db::consistency_level::ONE, {
permissions::to_strings(set), user, resource.name() }).discard_result();
}
@@ -145,15 +162,14 @@ future<> auth::default_authorizer::revoke(
}
future<std::vector<auth::permission_details>> auth::default_authorizer::list(
::shared_ptr<authenticated_user> performer, permission_set set,
service& ser, ::shared_ptr<authenticated_user> performer, permission_set set,
optional<data_resource> resource, optional<sstring> user) const {
return performer->is_super().then([this, performer, set = std::move(set), resource = std::move(resource), user = std::move(user)](bool is_super) {
return auth::is_super_user(ser, *performer).then([this, performer, set = std::move(set), resource = std::move(resource), user = std::move(user)](bool is_super) {
if (!is_super && (!user || performer->name() != *user)) {
throw exceptions::unauthorized_exception(sprint("You are not authorized to view %s's permissions", user ? *user : "everyone"));
}
auto query = sprint("SELECT %s, %s, %s FROM %s.%s", USER_NAME, RESOURCE_NAME, PERMISSIONS_NAME, auth::AUTH_KS, PERMISSIONS_CF);
auto& qp = cql3::get_local_query_processor();
auto query = sprint("SELECT %s, %s, %s FROM %s.%s", USER_NAME, RESOURCE_NAME, PERMISSIONS_NAME, meta::AUTH_KS, PERMISSIONS_CF);
// Oh, look, it is a case where it does not pay off to have
// parameters to process in an initializer list.
@@ -161,15 +177,15 @@ future<std::vector<auth::permission_details>> auth::default_authorizer::list(
if (resource && user) {
query += sprint(" WHERE %s = ? AND %s = ?", USER_NAME, RESOURCE_NAME);
f = qp.process(query, db::consistency_level::ONE, {*user, resource->name()});
f = _qp.process(query, db::consistency_level::ONE, {*user, resource->name()});
} else if (resource) {
query += sprint(" WHERE %s = ? ALLOW FILTERING", RESOURCE_NAME);
f = qp.process(query, db::consistency_level::ONE, {resource->name()});
f = _qp.process(query, db::consistency_level::ONE, {resource->name()});
} else if (user) {
query += sprint(" WHERE %s = ?", USER_NAME);
f = qp.process(query, db::consistency_level::ONE, {*user});
f = _qp.process(query, db::consistency_level::ONE, {*user});
} else {
f = qp.process(query, db::consistency_level::ONE, {});
f = _qp.process(query, db::consistency_level::ONE, {});
}
return f.then([set](::shared_ptr<cql3::untyped_result_set> res) {
@@ -191,10 +207,9 @@ future<std::vector<auth::permission_details>> auth::default_authorizer::list(
}
future<> auth::default_authorizer::revoke_all(sstring dropped_user) {
auto& qp = cql3::get_local_query_processor();
auto query = sprint("DELETE FROM %s.%s WHERE %s = ?", auth::AUTH_KS,
auto query = sprint("DELETE FROM %s.%s WHERE %s = ?", meta::AUTH_KS,
PERMISSIONS_CF, USER_NAME);
return qp.process(query, db::consistency_level::ONE, { dropped_user }).discard_result().handle_exception(
return _qp.process(query, db::consistency_level::ONE, { dropped_user }).discard_result().handle_exception(
[dropped_user](auto ep) {
try {
std::rethrow_exception(ep);
@@ -205,17 +220,16 @@ future<> auth::default_authorizer::revoke_all(sstring dropped_user) {
}
future<> auth::default_authorizer::revoke_all(data_resource resource) {
auto& qp = cql3::get_local_query_processor();
auto query = sprint("SELECT %s FROM %s.%s WHERE %s = ? ALLOW FILTERING",
USER_NAME, auth::AUTH_KS, PERMISSIONS_CF, RESOURCE_NAME);
return qp.process(query, db::consistency_level::LOCAL_ONE, { resource.name() })
.then_wrapped([resource, &qp](future<::shared_ptr<cql3::untyped_result_set>> f) {
USER_NAME, meta::AUTH_KS, PERMISSIONS_CF, RESOURCE_NAME);
return _qp.process(query, db::consistency_level::LOCAL_ONE, { resource.name() })
.then_wrapped([this, resource](future<::shared_ptr<cql3::untyped_result_set>> f) {
try {
auto res = f.get0();
return parallel_for_each(res->begin(), res->end(), [&qp, res, resource](const cql3::untyped_result_set::row& r) {
return parallel_for_each(res->begin(), res->end(), [this, res, resource](const cql3::untyped_result_set::row& r) {
auto query = sprint("DELETE FROM %s.%s WHERE %s = ? AND %s = ?"
, auth::AUTH_KS, PERMISSIONS_CF, USER_NAME, RESOURCE_NAME);
return qp.process(query, db::consistency_level::LOCAL_ONE, { r.get_as<sstring>(USER_NAME), resource.name() })
, meta::AUTH_KS, PERMISSIONS_CF, USER_NAME, RESOURCE_NAME);
return _qp.process(query, db::consistency_level::LOCAL_ONE, { r.get_as<sstring>(USER_NAME), resource.name() })
.discard_result().handle_exception([resource](auto ep) {
try {
std::rethrow_exception(ep);
@@ -234,7 +248,7 @@ future<> auth::default_authorizer::revoke_all(data_resource resource) {
const auth::resource_ids& auth::default_authorizer::protected_resources() {
static const resource_ids ids({ data_resource(auth::AUTH_KS, PERMISSIONS_CF) });
static const resource_ids ids({ data_resource(meta::AUTH_KS, PERMISSIONS_CF) });
return ids;
}

View File

@@ -41,26 +41,40 @@
#pragma once
#include <functional>
#include "authorizer.hh"
#include "cql3/query_processor.hh"
#include "service/migration_manager.hh"
namespace auth {
class default_authorizer : public authorizer {
public:
static const sstring DEFAULT_AUTHORIZER_NAME;
const sstring& default_authorizer_name();
default_authorizer();
class default_authorizer : public authorizer {
cql3::query_processor& _qp;
::service::migration_manager& _migration_manager;
public:
default_authorizer(cql3::query_processor&, ::service::migration_manager&);
~default_authorizer();
future<> init() override;
future<> start() override;
future<permission_set> authorize(::shared_ptr<authenticated_user>, data_resource) const override;
future<> stop() override;
const sstring& qualified_java_name() const override {
return default_authorizer_name();
}
future<permission_set> authorize(service&, ::shared_ptr<authenticated_user>, data_resource) const override;
future<> grant(::shared_ptr<authenticated_user>, permission_set, data_resource, sstring) override;
future<> revoke(::shared_ptr<authenticated_user>, permission_set, data_resource, sstring) override;
future<std::vector<permission_details>> list(::shared_ptr<authenticated_user>, permission_set, optional<data_resource>, optional<sstring>) const override;
future<std::vector<permission_details>> list(service&, ::shared_ptr<authenticated_user>, permission_set, optional<data_resource>, optional<sstring>) const override;
future<> revoke_all(sstring) override;

View File

@@ -46,33 +46,42 @@
#include <seastar/core/reactor.hh>
#include "auth.hh"
#include "common.hh"
#include "password_authenticator.hh"
#include "authenticated_user.hh"
#include "cql3/query_processor.hh"
#include "cql3/untyped_result_set.hh"
#include "log.hh"
#include "service/migration_manager.hh"
#include "utils/class_registrator.hh"
const sstring auth::password_authenticator::PASSWORD_AUTHENTICATOR_NAME(auth::AUTH_PACKAGE_NAME + "PasswordAuthenticator");
const sstring& auth::password_authenticator_name() {
static const sstring name = meta::AUTH_PACKAGE_NAME + "PasswordAuthenticator";
return name;
}
// name of the hash column.
static const sstring SALTED_HASH = "salted_hash";
static const sstring USER_NAME = "username";
static const sstring DEFAULT_USER_NAME = auth::auth::DEFAULT_SUPERUSER_NAME;
static const sstring DEFAULT_USER_PASSWORD = auth::auth::DEFAULT_SUPERUSER_NAME;
static const sstring DEFAULT_USER_NAME = auth::meta::DEFAULT_SUPERUSER_NAME;
static const sstring DEFAULT_USER_PASSWORD = auth::meta::DEFAULT_SUPERUSER_NAME;
static const sstring CREDENTIALS_CF = "credentials";
static logging::logger plogger("password_authenticator");
static const class_registrator<auth::authenticator, auth::password_authenticator> password_auth_reg(
auth::password_authenticator::PASSWORD_AUTHENTICATOR_NAME);
// To ensure correct initialization order, we unfortunately need to use a string literal.
static const class_registrator<
auth::authenticator,
auth::password_authenticator,
cql3::query_processor&,
::service::migration_manager&> password_auth_reg("org.apache.cassandra.auth.PasswordAuthenticator");
auth::password_authenticator::~password_authenticator()
{}
auth::password_authenticator::password_authenticator()
{}
auth::password_authenticator::password_authenticator(cql3::query_processor& qp, ::service::migration_manager& mm)
: _qp(qp)
, _migration_manager(mm) {
}
// TODO: blowfish
// Origin uses Java bcrypt library, i.e. blowfish salt
@@ -144,39 +153,52 @@ static sstring hashpw(const sstring& pass) {
return hashpw(pass, gensalt());
}
future<> auth::password_authenticator::init() {
gensalt(); // do this once to determine usable hashing
future<> auth::password_authenticator::start() {
return auth::once_among_shards([this] {
gensalt(); // do this once to determine usable hashing
sstring create_table = sprint(
"CREATE TABLE %s.%s ("
"%s text,"
"%s text," // salt + hash + number of rounds
"options map<text,text>,"// for future extensions
"PRIMARY KEY(%s)"
") WITH gc_grace_seconds=%d",
auth::auth::AUTH_KS,
CREDENTIALS_CF, USER_NAME, SALTED_HASH, USER_NAME,
90 * 24 * 60 * 60); // 3 months.
static const sstring create_table = sprint(
"CREATE TABLE %s.%s ("
"%s text,"
"%s text," // salt + hash + number of rounds
"options map<text,text>,"// for future extensions
"PRIMARY KEY(%s)"
") WITH gc_grace_seconds=%d",
meta::AUTH_KS,
CREDENTIALS_CF, USER_NAME, SALTED_HASH, USER_NAME,
90 * 24 * 60 * 60); // 3 months.
return auth::setup_table(CREDENTIALS_CF, create_table).then([this] {
// instead of once-timer, just schedule this later
auth::schedule_when_up([] {
return auth::has_existing_users(CREDENTIALS_CF, DEFAULT_USER_NAME, USER_NAME).then([](bool exists) {
if (!exists) {
cql3::get_local_query_processor().process(sprint("INSERT INTO %s.%s (%s, %s) VALUES (?, ?) USING TIMESTAMP 0",
auth::AUTH_KS,
CREDENTIALS_CF,
USER_NAME, SALTED_HASH
),
db::consistency_level::ONE, {DEFAULT_USER_NAME, hashpw(DEFAULT_USER_PASSWORD)}).then([](auto) {
plogger.info("Created default user '{}'", DEFAULT_USER_NAME);
});
}
return auth::create_metadata_table_if_missing(
CREDENTIALS_CF,
_qp,
create_table,
_migration_manager).then([this] {
auth::delay_until_system_ready(_delayed, [this] {
return has_existing_users().then([this](bool existing) {
if (!existing) {
return _qp.process(
sprint(
"INSERT INTO %s.%s (%s, %s) VALUES (?, ?) USING TIMESTAMP 0",
meta::AUTH_KS,
CREDENTIALS_CF,
USER_NAME, SALTED_HASH),
db::consistency_level::ONE,
{ DEFAULT_USER_NAME, hashpw(DEFAULT_USER_PASSWORD) }).then([](auto) {
plogger.info("Created default user '{}'", DEFAULT_USER_NAME);
});
}
return make_ready_future<>();
});
});
});
});
}
future<> auth::password_authenticator::stop() {
return make_ready_future<>();
}
db::consistency_level auth::password_authenticator::consistency_for_user(const sstring& username) {
if (username == DEFAULT_USER_NAME) {
return db::consistency_level::QUORUM;
@@ -184,8 +206,8 @@ db::consistency_level auth::password_authenticator::consistency_for_user(const s
return db::consistency_level::LOCAL_ONE;
}
const sstring& auth::password_authenticator::class_name() const {
return PASSWORD_AUTHENTICATOR_NAME;
const sstring& auth::password_authenticator::qualified_java_name() const {
return password_authenticator_name();
}
bool auth::password_authenticator::require_authentication() const {
@@ -218,9 +240,8 @@ future<::shared_ptr<auth::authenticated_user> > auth::password_authenticator::au
// Rely on query processing caching statements instead, and lets assume
// that a map lookup string->statement is not gonna kill us much.
return futurize_apply([this, username, password] {
auto& qp = cql3::get_local_query_processor();
return qp.process(sprint("SELECT %s FROM %s.%s WHERE %s = ?", SALTED_HASH,
auth::AUTH_KS, CREDENTIALS_CF, USER_NAME),
return _qp.process(sprint("SELECT %s FROM %s.%s WHERE %s = ?", SALTED_HASH,
meta::AUTH_KS, CREDENTIALS_CF, USER_NAME),
consistency_for_user(username), {username}, true);
}).then_wrapped([=](future<::shared_ptr<cql3::untyped_result_set>> f) {
try {
@@ -244,9 +265,8 @@ future<> auth::password_authenticator::create(sstring username,
try {
auto password = boost::any_cast<sstring>(options.at(option::PASSWORD));
auto query = sprint("INSERT INTO %s.%s (%s, %s) VALUES (?, ?)",
auth::AUTH_KS, CREDENTIALS_CF, USER_NAME, SALTED_HASH);
auto& qp = cql3::get_local_query_processor();
return qp.process(query, consistency_for_user(username), { username, hashpw(password) }).discard_result();
meta::AUTH_KS, CREDENTIALS_CF, USER_NAME, SALTED_HASH);
return _qp.process(query, consistency_for_user(username), { username, hashpw(password) }).discard_result();
} catch (std::out_of_range&) {
throw exceptions::invalid_request_exception("PasswordAuthenticator requires PASSWORD option");
}
@@ -257,9 +277,8 @@ future<> auth::password_authenticator::alter(sstring username,
try {
auto password = boost::any_cast<sstring>(options.at(option::PASSWORD));
auto query = sprint("UPDATE %s.%s SET %s = ? WHERE %s = ?",
auth::AUTH_KS, CREDENTIALS_CF, SALTED_HASH, USER_NAME);
auto& qp = cql3::get_local_query_processor();
return qp.process(query, consistency_for_user(username), { hashpw(password), username }).discard_result();
meta::AUTH_KS, CREDENTIALS_CF, SALTED_HASH, USER_NAME);
return _qp.process(query, consistency_for_user(username), { hashpw(password), username }).discard_result();
} catch (std::out_of_range&) {
throw exceptions::invalid_request_exception("PasswordAuthenticator requires PASSWORD option");
}
@@ -268,23 +287,24 @@ future<> auth::password_authenticator::alter(sstring username,
future<> auth::password_authenticator::drop(sstring username) {
try {
auto query = sprint("DELETE FROM %s.%s WHERE %s = ?",
auth::AUTH_KS, CREDENTIALS_CF, USER_NAME);
auto& qp = cql3::get_local_query_processor();
return qp.process(query, consistency_for_user(username), { username }).discard_result();
meta::AUTH_KS, CREDENTIALS_CF, USER_NAME);
return _qp.process(query, consistency_for_user(username), { username }).discard_result();
} catch (std::out_of_range&) {
throw exceptions::invalid_request_exception("PasswordAuthenticator requires PASSWORD option");
}
}
const auth::resource_ids& auth::password_authenticator::protected_resources() const {
static const resource_ids ids({ data_resource(auth::AUTH_KS, CREDENTIALS_CF) });
static const resource_ids ids({ data_resource(meta::AUTH_KS, CREDENTIALS_CF) });
return ids;
}
::shared_ptr<auth::authenticator::sasl_challenge> auth::password_authenticator::new_sasl_challenge() const {
class plain_text_password_challenge: public sasl_challenge {
const password_authenticator& _self;
public:
plain_text_password_challenge()
plain_text_password_challenge(const password_authenticator& self) : _self(self)
{}
/**
@@ -339,11 +359,58 @@ const auth::resource_ids& auth::password_authenticator::protected_resources() co
return _complete;
}
future<::shared_ptr<authenticated_user>> get_authenticated_user() const override {
return authenticator::get().authenticate(_credentials);
return _self.authenticate(_credentials);
}
private:
credentials_map _credentials;
bool _complete = false;
};
return ::make_shared<plain_text_password_challenge>();
return ::make_shared<plain_text_password_challenge>(*this);
}
//
// Similar in structure to `auth::service::has_existing_users()`, but trying to generalize the pattern breaks all kinds
// of module boundaries and leaks implementation details.
//
future<bool> auth::password_authenticator::has_existing_users() const {
static const sstring default_user_query = sprint(
"SELECT * FROM %s.%s WHERE %s = ?",
meta::AUTH_KS,
CREDENTIALS_CF,
USER_NAME);
static const sstring all_users_query = sprint(
"SELECT * FROM %s.%s LIMIT 1",
meta::AUTH_KS,
CREDENTIALS_CF);
// This logic is borrowed directly from Apache Cassandra. By first checking for the presence of the default user, we
// can potentially avoid doing a range query with a high consistency level.
return _qp.process(
default_user_query,
db::consistency_level::ONE,
{ meta::DEFAULT_SUPERUSER_NAME },
true).then([this](auto results) {
if (!results->empty()) {
return make_ready_future<bool>(true);
}
return _qp.process(
default_user_query,
db::consistency_level::QUORUM,
{ meta::DEFAULT_SUPERUSER_NAME },
true).then([this](auto results) {
if (!results->empty()) {
return make_ready_future<bool>(true);
}
return _qp.process(
all_users_query,
db::consistency_level::QUORUM).then([](auto results) {
return make_ready_future<bool>(!results->empty());
});
});
});
}

View File

@@ -42,19 +42,33 @@
#pragma once
#include "authenticator.hh"
#include "cql3/query_processor.hh"
#include "delayed_tasks.hh"
namespace service {
class migration_manager;
}
namespace auth {
class password_authenticator : public authenticator {
public:
static const sstring PASSWORD_AUTHENTICATOR_NAME;
const sstring& password_authenticator_name();
password_authenticator();
class password_authenticator : public authenticator {
cql3::query_processor& _qp;
::service::migration_manager& _migration_manager;
delayed_tasks<> _delayed{};
public:
password_authenticator(cql3::query_processor&, ::service::migration_manager&);
~password_authenticator();
future<> init() override;
future<> start() override;
const sstring& class_name() const override;
future<> stop() override;
const sstring& qualified_java_name() const override;
bool require_authentication() const override;
option_set supported_options() const override;
option_set alterable_options() const override;
@@ -67,6 +81,9 @@ public:
static db::consistency_level consistency_for_user(const sstring& username);
private:
future<bool> has_existing_users() const;
};
}

51
auth/permissions_cache.cc Normal file
View File

@@ -0,0 +1,51 @@
/*
* Copyright (C) 2017 ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#include "auth/permissions_cache.hh"
#include "auth/authorizer.hh"
#include "auth/common.hh"
#include "auth/service.hh"
#include "db/config.hh"
namespace auth {
permissions_cache_config permissions_cache_config::from_db_config(const db::config& dc) {
permissions_cache_config c;
c.max_entries = dc.permissions_cache_max_entries();
c.validity_period = std::chrono::milliseconds(dc.permissions_validity_in_ms());
c.update_period = std::chrono::milliseconds(dc.permissions_update_interval_in_ms());
return c;
}
permissions_cache::permissions_cache(const permissions_cache_config& c, service& ser, logging::logger& log)
: _cache(c.max_entries, c.validity_period, c.update_period, log, [&ser, &log](const key_type& k) {
log.debug("Refreshing permissions for {}", k.first.name());
return ser.underlying_authorizer().authorize(ser, ::make_shared<authenticated_user>(k.first), k.second);
}) {
}
future<permission_set> permissions_cache::get(::shared_ptr<authenticated_user> user, data_resource r) {
return _cache.get(key_type(*user, r));
}
}

103
auth/permissions_cache.hh Normal file
View File

@@ -0,0 +1,103 @@
/*
* Copyright (C) 2017 ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include <chrono>
#include <functional>
#include <iostream>
#include <utility>
#include <seastar/core/future.hh>
#include <seastar/core/shared_ptr.hh>
#include "auth/authenticated_user.hh"
#include "auth/data_resource.hh"
#include "auth/permission.hh"
#include "log.hh"
#include "utils/loading_cache.hh"
namespace std {
template <>
struct hash<auth::data_resource> final {
size_t operator()(const auth::data_resource & v) const {
return v.hash_value();
}
};
template <>
struct hash<auth::authenticated_user> final {
size_t operator()(const auth::authenticated_user & v) const {
return utils::tuple_hash()(v.name(), v.is_anonymous());
}
};
inline std::ostream& operator<<(std::ostream& os, const std::pair<auth::authenticated_user, auth::data_resource>& p) {
os << "{user: " << p.first.name() << ", data_resource: " << p.second << "}";
return os;
}
}
namespace db {
class config;
}
namespace auth {
class service;
struct permissions_cache_config final {
static permissions_cache_config from_db_config(const db::config&);
std::size_t max_entries;
std::chrono::milliseconds validity_period;
std::chrono::milliseconds update_period;
};
class permissions_cache final {
using cache_type = utils::loading_cache<
std::pair<authenticated_user, data_resource>,
permission_set,
utils::loading_cache_reload_enabled::yes,
utils::simple_entry_size<permission_set>,
utils::tuple_hash>;
using key_type = typename cache_type::key_type;
cache_type _cache;
public:
explicit permissions_cache(const permissions_cache_config&, service&, logging::logger&);
future<> start() {
return make_ready_future<>();
}
future <> stop() {
return _cache.stop();
}
future<permission_set> get(::shared_ptr<authenticated_user>, data_resource);
};
}

353
auth/service.cc Normal file
View File

@@ -0,0 +1,353 @@
/*
* Copyright (C) 2017 ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#include "auth/service.hh"
#include <map>
#include <seastar/core/future-util.hh>
#include <seastar/core/sharded.hh>
#include <seastar/core/shared_ptr.hh>
#include "auth/allow_all_authenticator.hh"
#include "auth/allow_all_authorizer.hh"
#include "auth/common.hh"
#include "cql3/query_processor.hh"
#include "cql3/untyped_result_set.hh"
#include "db/config.hh"
#include "db/consistency_level.hh"
#include "exceptions/exceptions.hh"
#include "log.hh"
#include "service/migration_listener.hh"
#include "utils/class_registrator.hh"
namespace auth {
namespace meta {
static const sstring user_name_col_name("name");
static const sstring superuser_col_name("super");
}
static logging::logger log("auth_service");
class auth_migration_listener final : public ::service::migration_listener {
authorizer& _authorizer;
public:
explicit auth_migration_listener(authorizer& a) : _authorizer(a) {
}
private:
void on_create_keyspace(const sstring& ks_name) override {}
void on_create_column_family(const sstring& ks_name, const sstring& cf_name) override {}
void on_create_user_type(const sstring& ks_name, const sstring& type_name) override {}
void on_create_function(const sstring& ks_name, const sstring& function_name) override {}
void on_create_aggregate(const sstring& ks_name, const sstring& aggregate_name) override {}
void on_create_view(const sstring& ks_name, const sstring& view_name) override {}
void on_update_keyspace(const sstring& ks_name) override {}
void on_update_column_family(const sstring& ks_name, const sstring& cf_name, bool) override {}
void on_update_user_type(const sstring& ks_name, const sstring& type_name) override {}
void on_update_function(const sstring& ks_name, const sstring& function_name) override {}
void on_update_aggregate(const sstring& ks_name, const sstring& aggregate_name) override {}
void on_update_view(const sstring& ks_name, const sstring& view_name, bool columns_changed) override {}
void on_drop_keyspace(const sstring& ks_name) override {
_authorizer.revoke_all(auth::data_resource(ks_name));
}
void on_drop_column_family(const sstring& ks_name, const sstring& cf_name) override {
_authorizer.revoke_all(auth::data_resource(ks_name, cf_name));
}
void on_drop_user_type(const sstring& ks_name, const sstring& type_name) override {}
void on_drop_function(const sstring& ks_name, const sstring& function_name) override {}
void on_drop_aggregate(const sstring& ks_name, const sstring& aggregate_name) override {}
void on_drop_view(const sstring& ks_name, const sstring& view_name) override {}
};
static sharded<permissions_cache> sharded_permissions_cache{};
static db::consistency_level consistency_for_user(const sstring& name) {
if (name == meta::DEFAULT_SUPERUSER_NAME) {
return db::consistency_level::QUORUM;
} else {
return db::consistency_level::LOCAL_ONE;
}
}
static future<::shared_ptr<cql3::untyped_result_set>> select_user(cql3::query_processor& qp, const sstring& name) {
// Here was a thread local, explicit cache of prepared statement. In normal execution this is
// fine, but since we in testing set up and tear down system over and over, we'd start using
// obsolete prepared statements pretty quickly.
// Rely on query processing caching statements instead, and lets assume
// that a map lookup string->statement is not gonna kill us much.
return qp.process(
sprint(
"SELECT * FROM %s.%s WHERE %s = ?",
meta::AUTH_KS,
meta::USERS_CF,
meta::user_name_col_name),
consistency_for_user(name),
{ name },
true);
}
service_config service_config::from_db_config(const db::config& dc) {
const qualified_name qualified_authorizer_name(meta::AUTH_PACKAGE_NAME, dc.authorizer());
const qualified_name qualified_authenticator_name(meta::AUTH_PACKAGE_NAME, dc.authenticator());
service_config c;
c.authorizer_java_name = qualified_authorizer_name;
c.authenticator_java_name = qualified_authenticator_name;
return c;
}
service::service(
permissions_cache_config c,
cql3::query_processor& qp,
::service::migration_manager& mm,
std::unique_ptr<authorizer> a,
std::unique_ptr<authenticator> b)
: _cache_config(std::move(c))
, _qp(qp)
, _migration_manager(mm)
, _authorizer(std::move(a))
, _authenticator(std::move(b))
, _migration_listener(std::make_unique<auth_migration_listener>(*_authorizer)) {
}
service::service(
permissions_cache_config cache_config,
cql3::query_processor& qp,
::service::migration_manager& mm,
const service_config& sc)
: service(
std::move(cache_config),
qp,
mm,
create_object<authorizer>(sc.authorizer_java_name, qp, mm),
create_object<authenticator>(sc.authenticator_java_name, qp, mm)) {
}
bool service::should_create_metadata() const {
const bool null_authorizer = _authorizer->qualified_java_name() == allow_all_authorizer_name();
const bool null_authenticator = _authenticator->qualified_java_name() == allow_all_authenticator_name();
return !null_authorizer || !null_authenticator;
}
future<> service::create_metadata_if_missing() {
auto& db = _qp.db().local();
auto f = make_ready_future<>();
if (!db.has_keyspace(meta::AUTH_KS)) {
std::map<sstring, sstring> opts{{"replication_factor", "1"}};
auto ksm = keyspace_metadata::new_keyspace(
meta::AUTH_KS,
"org.apache.cassandra.locator.SimpleStrategy",
opts,
true);
// We use min_timestamp so that default keyspace metadata will loose with any manual adjustments.
// See issue #2129.
f = _migration_manager.announce_new_keyspace(ksm, api::min_timestamp, false);
}
return f.then([this] {
// 3 months.
static const auto gc_grace_seconds = 90 * 24 * 60 * 60;
static const sstring users_table_query = sprint(
"CREATE TABLE %s.%s (%s text, %s boolean, PRIMARY KEY (%s)) WITH gc_grace_seconds=%s",
meta::AUTH_KS,
meta::USERS_CF,
meta::user_name_col_name,
meta::superuser_col_name,
meta::user_name_col_name,
gc_grace_seconds);
return create_metadata_table_if_missing(
meta::USERS_CF,
_qp,
users_table_query,
_migration_manager);
}).then([this] {
delay_until_system_ready(_delayed, [this] {
return has_existing_users().then([this](bool existing) {
if (!existing) {
//
// Create default superuser.
//
static const sstring query = sprint(
"INSERT INTO %s.%s (%s, %s) VALUES (?, ?) USING TIMESTAMP 0",
meta::AUTH_KS,
meta::USERS_CF,
meta::user_name_col_name,
meta::superuser_col_name);
return _qp.process(
query,
db::consistency_level::ONE,
{ meta::DEFAULT_SUPERUSER_NAME, true }).then([](auto&&) {
log.info("Created default superuser '{}'", meta::DEFAULT_SUPERUSER_NAME);
}).handle_exception([](auto exn) {
try {
std::rethrow_exception(exn);
} catch (const exceptions::request_execution_exception&) {
log.warn("Skipped default superuser setup: some nodes were not ready");
}
}).discard_result();
}
return make_ready_future<>();
});
});
return make_ready_future<>();
});
}
future<> service::start() {
return once_among_shards([this] {
if (should_create_metadata()) {
return create_metadata_if_missing();
}
return make_ready_future<>();
}).then([this] {
return when_all_succeed(_authorizer->start(), _authenticator->start());
}).then([this] {
return once_among_shards([this] {
_migration_manager.register_listener(_migration_listener.get());
return sharded_permissions_cache.start(std::ref(_cache_config), std::ref(*this), std::ref(log));
});
});
}
future<> service::stop() {
return once_among_shards([this] {
_delayed.cancel_all();
return sharded_permissions_cache.stop();
}).then([this] {
return when_all_succeed(_authorizer->stop(), _authenticator->stop());
});
}
future<bool> service::has_existing_users() const {
static const sstring default_user_query = sprint(
"SELECT * FROM %s.%s WHERE %s = ?",
meta::AUTH_KS,
meta::USERS_CF,
meta::user_name_col_name);
static const sstring all_users_query = sprint(
"SELECT * FROM %s.%s LIMIT 1",
meta::AUTH_KS,
meta::USERS_CF);
// This logic is borrowed directly from Apache Cassandra. By first checking for the presence of the default user, we
// can potentially avoid doing a range query with a high consistency level.
return _qp.process(
default_user_query,
db::consistency_level::ONE,
{ meta::DEFAULT_SUPERUSER_NAME },
true).then([this](auto results) {
if (!results->empty()) {
return make_ready_future<bool>(true);
}
return _qp.process(
default_user_query,
db::consistency_level::QUORUM,
{ meta::DEFAULT_SUPERUSER_NAME },
true).then([this](auto results) {
if (!results->empty()) {
return make_ready_future<bool>(true);
}
return _qp.process(
all_users_query,
db::consistency_level::QUORUM).then([](auto results) {
return make_ready_future<bool>(!results->empty());
});
});
});
}
future<bool> service::is_existing_user(const sstring& name) const {
return select_user(_qp, name).then([](auto results) {
return !results->empty();
});
}
future<bool> service::is_super_user(const sstring& name) const {
return select_user(_qp, name).then([](auto results) {
return !results->empty() && results->one().template get_as<bool>(meta::superuser_col_name);
});
}
future<> service::insert_user(const sstring& name, bool is_superuser) {
return _qp.process(
sprint(
"INSERT INTO %s.%s (%s, %s) VALUES (?, ?)",
meta::AUTH_KS,
meta::USERS_CF,
meta::user_name_col_name,
meta::superuser_col_name),
consistency_for_user(name),
{ name, is_superuser }).discard_result();
}
future<> service::delete_user(const sstring& name) {
return _qp.process(
sprint(
"DELETE FROM %s.%s WHERE %s = ?",
meta::AUTH_KS,
meta::USERS_CF,
meta::user_name_col_name),
consistency_for_user(name),
{ name }).discard_result();
}
future<permission_set> service::get_permissions(::shared_ptr<authenticated_user> u, data_resource r) const {
return sharded_permissions_cache.local().get(std::move(u), std::move(r));
}
//
// Free functions.
//
future<bool> is_super_user(const service& ser, const authenticated_user& u) {
if (u.is_anonymous()) {
return make_ready_future<bool>(false);
}
return ser.is_super_user(u.name());
}
}

132
auth/service.hh Normal file
View File

@@ -0,0 +1,132 @@
/*
* Copyright (C) 2017 ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include <memory>
#include <seastar/core/future.hh>
#include <seastar/core/sstring.hh>
#include "auth/authenticator.hh"
#include "auth/authorizer.hh"
#include "auth/authenticated_user.hh"
#include "auth/permission.hh"
#include "auth/permissions_cache.hh"
#include "delayed_tasks.hh"
#include "seastarx.hh"
namespace cql3 {
class query_processor;
}
namespace db {
class config;
}
namespace service {
class migration_manager;
class migration_listener;
}
namespace auth {
class authenticator;
class authorizer;
struct service_config final {
static service_config from_db_config(const db::config&);
sstring authorizer_java_name;
sstring authenticator_java_name;
};
class service final {
permissions_cache_config _cache_config;
cql3::query_processor& _qp;
::service::migration_manager& _migration_manager;
std::unique_ptr<authorizer> _authorizer;
std::unique_ptr<authenticator> _authenticator;
// Only one of these should be registered, so we end up with some unused instances. Not the end of the world.
std::unique_ptr<::service::migration_listener> _migration_listener;
delayed_tasks<> _delayed{};
public:
service(
permissions_cache_config,
cql3::query_processor&,
::service::migration_manager&,
std::unique_ptr<authorizer>,
std::unique_ptr<authenticator>);
service(
permissions_cache_config,
cql3::query_processor&,
::service::migration_manager&,
const service_config&);
future<> start();
future<> stop();
future<bool> is_existing_user(const sstring& name) const;
future<bool> is_super_user(const sstring& name) const;
future<> insert_user(const sstring& name, bool is_superuser);
future<> delete_user(const sstring& name);
future<permission_set> get_permissions(::shared_ptr<authenticated_user>, data_resource) const;
authenticator& underlying_authenticator() {
return *_authenticator;
}
const authenticator& underlying_authenticator() const {
return *_authenticator;
}
authorizer& underlying_authorizer() {
return *_authorizer;
}
const authorizer& underlying_authorizer() const {
return *_authorizer;
}
private:
future<bool> has_existing_users() const;
bool should_create_metadata() const;
future<> create_metadata_if_missing();
};
future<bool> is_super_user(const service&, const authenticated_user&);
}

View File

@@ -46,32 +46,44 @@
#include "password_authenticator.hh"
#include "default_authorizer.hh"
#include "permission.hh"
#include "auth.hh"
#include "db/config.hh"
#include "utils/class_registrator.hh"
namespace auth {
class service;
static const sstring PACKAGE_NAME("com.scylladb.auth.");
static const sstring TRANSITIONAL_AUTHENTICATOR_NAME(PACKAGE_NAME + "TransitionalAuthenticator");
static const sstring TRANSITIONAL_AUTHORIZER_NAME(PACKAGE_NAME + "TransitionalAuthorizer");
static const sstring& transitional_authenticator_name() {
static const sstring name = PACKAGE_NAME + "TransitionalAuthenticator";
return name;
}
static const sstring& transitional_authorizer_name() {
static const sstring name = PACKAGE_NAME + "TransitionalAuthorizer";
return name;
}
class transitional_authenticator : public authenticator {
std::unique_ptr<authenticator> _authenticator;
public:
static const sstring PASSWORD_AUTHENTICATOR_NAME;
transitional_authenticator()
: transitional_authenticator(std::make_unique<password_authenticator>())
transitional_authenticator(cql3::query_processor& qp, ::service::migration_manager& mm)
: transitional_authenticator(std::make_unique<password_authenticator>(qp, mm))
{}
transitional_authenticator(std::unique_ptr<authenticator> a)
: _authenticator(std::move(a))
{}
future<> init() override {
return _authenticator->init();
future<> start() override {
return _authenticator->start();
}
const sstring& class_name() const override {
return TRANSITIONAL_AUTHENTICATOR_NAME;
future<> stop() override {
return _authenticator->stop();
}
const sstring& qualified_java_name() const override {
return transitional_authenticator_name();
}
bool require_authentication() const override {
return true;
@@ -142,19 +154,25 @@ public:
class transitional_authorizer : public authorizer {
std::unique_ptr<authorizer> _authorizer;
public:
transitional_authorizer()
: transitional_authorizer(std::make_unique<default_authorizer>())
transitional_authorizer(cql3::query_processor& qp, ::service::migration_manager& mm)
: transitional_authorizer(std::make_unique<default_authorizer>(qp, mm))
{}
transitional_authorizer(std::unique_ptr<authorizer> a)
: _authorizer(std::move(a))
{}
~transitional_authorizer()
{}
future<> init() override {
return _authorizer->init();
future<> start() override {
return _authorizer->start();
}
future<permission_set> authorize(::shared_ptr<authenticated_user> user, data_resource resource) const override {
return user->is_super().then([](bool s) {
future<> stop() override {
return _authorizer->stop();
}
const sstring& qualified_java_name() const override {
return transitional_authorizer_name();
}
future<permission_set> authorize(service& ser, ::shared_ptr<authenticated_user> user, data_resource resource) const override {
return is_super_user(ser, *user).then([](bool s) {
static const permission_set transitional_permissions =
permission_set::of<permission::CREATE,
permission::ALTER, permission::DROP,
@@ -169,8 +187,8 @@ public:
future<> revoke(::shared_ptr<authenticated_user> user, permission_set ps, data_resource r, sstring s) override {
return _authorizer->revoke(std::move(user), std::move(ps), std::move(r), std::move(s));
}
future<std::vector<permission_details>> list(::shared_ptr<authenticated_user> user, permission_set ps, optional<data_resource> r, optional<sstring> s) const override {
return _authorizer->list(std::move(user), std::move(ps), std::move(r), std::move(s));
future<std::vector<permission_details>> list(service& ser, ::shared_ptr<authenticated_user> user, permission_set ps, optional<data_resource> r, optional<sstring> s) const override {
return _authorizer->list(ser, std::move(user), std::move(ps), std::move(r), std::move(s));
}
future<> revoke_all(sstring s) override {
return _authorizer->revoke_all(std::move(s));
@@ -188,9 +206,18 @@ public:
}
static const class_registrator<auth::authenticator,
auth::transitional_authenticator> transitional_authenticator_reg(
auth::TRANSITIONAL_AUTHENTICATOR_NAME);
//
// To ensure correct initialization order, we unfortunately need to use string literals.
//
static const class_registrator<auth::authorizer, auth::transitional_authorizer> transitional_authorizer_reg(
auth::TRANSITIONAL_AUTHORIZER_NAME);
static const class_registrator<
auth::authenticator,
auth::transitional_authenticator,
cql3::query_processor&,
::service::migration_manager&> transitional_authenticator_reg("com.scylladb.auth.TransitionalAuthenticator");
static const class_registrator<
auth::authorizer,
auth::transitional_authorizer,
cql3::query_processor&,
::service::migration_manager&> transitional_authorizer_reg("com.scylladb.auth.TransitionalAuthorizer");

View File

@@ -524,14 +524,17 @@ scylla_core = (['database.cc',
'lister.cc',
'repair/repair.cc',
'exceptions/exceptions.cc',
'auth/auth.cc',
'auth/allow_all_authenticator.cc',
'auth/allow_all_authorizer.cc',
'auth/authenticated_user.cc',
'auth/authenticator.cc',
'auth/authorizer.cc',
'auth/common.cc',
'auth/default_authorizer.cc',
'auth/data_resource.cc',
'auth/password_authenticator.cc',
'auth/permission.cc',
'auth/permissions_cache.cc',
'auth/service.cc',
'auth/transitional.cc',
'tracing/tracing.cc',
'tracing/trace_keyspace_helper.cc',

View File

@@ -42,8 +42,8 @@
#include <boost/range/adaptor/map.hpp>
#include "alter_user_statement.hh"
#include "auth/auth.hh"
#include "auth/authenticator.hh"
#include "auth/service.hh"
cql3::statements::alter_user_statement::alter_user_statement(sstring username, ::shared_ptr<user_options> opts, std::experimental::optional<bool> superuser)
: _username(std::move(username))
@@ -52,7 +52,7 @@ cql3::statements::alter_user_statement::alter_user_statement(sstring username, :
{}
void cql3::statements::alter_user_statement::validate(distributed<service::storage_proxy>& proxy, const service::client_state& state) {
_opts->validate();
_opts->validate(state.get_auth_service()->underlying_authenticator());
if (!_superuser && _opts->empty()) {
throw exceptions::invalid_request_exception("ALTER USER can't be empty");
@@ -73,7 +73,10 @@ future<> cql3::statements::alter_user_statement::check_access(const service::cli
// my disgust.
throw exceptions::unauthorized_exception("You aren't allowed to alter your own superuser status");
}
return user->is_super().then([this, user](bool is_super) {
const auto& auth_service = *state.get_auth_service();
return auth::is_super_user(auth_service, *user).then([this, user, &auth_service](bool is_super) {
if (_superuser && !is_super) {
throw exceptions::unauthorized_exception("Only superusers are allowed to alter superuser status");
}
@@ -84,7 +87,7 @@ future<> cql3::statements::alter_user_statement::check_access(const service::cli
if (!is_super) {
for (auto o : _opts->options() | boost::adaptors::map_keys) {
if (!auth::authenticator::get().alterable_options().contains(o)) {
if (!auth_service.underlying_authenticator().alterable_options().contains(o)) {
throw exceptions::unauthorized_exception(sprint("You aren't allowed to alter {} option", o));
}
}
@@ -94,14 +97,17 @@ future<> cql3::statements::alter_user_statement::check_access(const service::cli
future<::shared_ptr<cql_transport::messages::result_message>>
cql3::statements::alter_user_statement::execute(distributed<service::storage_proxy>& proxy, service::query_state& state, const query_options& options) {
return auth::auth::is_existing_user(_username).then([this](bool exists) {
auto& client_state = state.get_client_state();
auto& auth_service = *client_state.get_auth_service();
return auth_service.is_existing_user(_username).then([this, &auth_service](bool exists) {
if (!exists) {
throw exceptions::invalid_request_exception(sprint("User %s doesn't exist", _username));
}
auto f = _opts->options().empty() ? make_ready_future() : auth::authenticator::get().alter(_username, _opts->options());
auto f = _opts->options().empty() ? make_ready_future() : auth_service.underlying_authenticator().alter(_username, _opts->options());
if (_superuser) {
f = f.then([this] {
return auth::auth::insert_user(_username, *_superuser);
f = f.then([this, &auth_service] {
return auth_service.insert_user(_username, *_superuser);
});
}
return f.then([] { return make_ready_future<::shared_ptr<cql_transport::messages::result_message>>(); });

View File

@@ -40,8 +40,8 @@
*/
#include "create_user_statement.hh"
#include "auth/auth.hh"
#include "auth/authenticator.hh"
#include "auth/service.hh"
cql3::statements::create_user_statement::create_user_statement(sstring username, ::shared_ptr<user_options> opts, bool superuser, bool if_not_exists)
: _username(std::move(username))
@@ -55,7 +55,7 @@ void cql3::statements::create_user_statement::validate(distributed<service::stor
throw exceptions::invalid_request_exception("Username can't be an empty string");
}
_opts->validate();
_opts->validate(state.get_auth_service()->underlying_authenticator());
// validate login here before checkAccess to avoid leaking user existence to anonymous users.
state.ensure_not_anonymous();
@@ -66,19 +66,22 @@ void cql3::statements::create_user_statement::validate(distributed<service::stor
future<::shared_ptr<cql_transport::messages::result_message>>
cql3::statements::create_user_statement::execute(distributed<service::storage_proxy>& proxy, service::query_state& state, const query_options& options) {
return state.get_client_state().user()->is_super().then([this](bool is_super) {
auto& client_state = state.get_client_state();
auto& auth_service = *client_state.get_auth_service();
return auth::is_super_user(auth_service, *client_state.user()).then([this, &auth_service](bool is_super) {
if (!is_super) {
throw exceptions::unauthorized_exception("Only superusers are allowed to perform CREATE USER queries");
}
return auth::auth::is_existing_user(_username).then([this](bool exists) {
return auth_service.is_existing_user(_username).then([this, &auth_service](bool exists) {
if (exists && !_if_not_exists) {
throw exceptions::invalid_request_exception(sprint("User %s already exists", _username));
}
if (exists && _if_not_exists) {
make_ready_future<::shared_ptr<cql_transport::messages::result_message>>();
}
return auth::authenticator::get().create(_username, _opts->options()).then([this] {
return auth::auth::insert_user(_username, _superuser).then([] {
return auth_service.underlying_authenticator().create(_username, _opts->options()).then([this, &auth_service] {
return auth_service.insert_user(_username, _superuser).then([] {
return make_ready_future<::shared_ptr<cql_transport::messages::result_message>>();
});
});

View File

@@ -42,9 +42,9 @@
#include <boost/range/adaptor/map.hpp>
#include "drop_user_statement.hh"
#include "auth/auth.hh"
#include "auth/authenticator.hh"
#include "auth/authorizer.hh"
#include "auth/service.hh"
cql3::statements::drop_user_statement::drop_user_statement(sstring username, bool if_exists)
: _username(std::move(username))
@@ -65,12 +65,15 @@ void cql3::statements::drop_user_statement::validate(distributed<service::storag
future<::shared_ptr<cql_transport::messages::result_message>>
cql3::statements::drop_user_statement::execute(distributed<service::storage_proxy>& proxy, service::query_state& state, const query_options& options) {
return state.get_client_state().user()->is_super().then([this](bool is_super) {
auto& client_state = state.get_client_state();
auto& auth_service = *client_state.get_auth_service();
return auth::is_super_user(auth_service, *client_state.user()).then([this, &auth_service](bool is_super) {
if (!is_super) {
throw exceptions::unauthorized_exception("Only superusers are allowed to perform DROP USER queries");
}
return auth::auth::is_existing_user(_username).then([this](bool exists) {
return auth_service.is_existing_user(_username).then([this, &auth_service](bool exists) {
if (!_if_exists && !exists) {
throw exceptions::invalid_request_exception(sprint("User %s doesn't exist", _username));
}
@@ -79,9 +82,9 @@ cql3::statements::drop_user_statement::execute(distributed<service::storage_prox
}
// clean up permissions after the dropped user.
return auth::authorizer::get().revoke_all(_username).then([this] {
return auth::auth::delete_user(_username).then([this] {
return auth::authenticator::get().drop(_username);
return auth_service.underlying_authorizer().revoke_all(_username).then([this, &auth_service] {
return auth_service.delete_user(_username).then([this, &auth_service] {
return auth_service.underlying_authenticator().drop(_username);
});
}).then([] {
return make_ready_future<::shared_ptr<cql_transport::messages::result_message>>();

View File

@@ -44,7 +44,10 @@
future<::shared_ptr<cql_transport::messages::result_message>>
cql3::statements::grant_statement::execute(distributed<service::storage_proxy>& proxy, service::query_state& state, const query_options& options) {
return auth::authorizer::get().grant(state.get_client_state().user(), _permissions, _resource, _username).then([] {
auto& client_state = state.get_client_state();
auto& auth_service = *client_state.get_auth_service();
return auth_service.underlying_authorizer().grant(client_state.user(), _permissions, _resource, _username).then([] {
return make_ready_future<::shared_ptr<cql_transport::messages::result_message>>();
});
}

View File

@@ -44,7 +44,7 @@
#include "list_permissions_statement.hh"
#include "auth/authorizer.hh"
#include "auth/auth.hh"
#include "auth/common.hh"
#include "cql3/result_set.hh"
#include "transport/messages/result_message.hh"
@@ -64,7 +64,7 @@ void cql3::statements::list_permissions_statement::validate(distributed<service:
future<> cql3::statements::list_permissions_statement::check_access(const service::client_state& state) {
auto f = make_ready_future();
if (_username) {
f = auth::auth::is_existing_user(*_username).then([this](bool exists) {
f = state.get_auth_service()->is_existing_user(*_username).then([this](bool exists) {
if (!exists) {
throw exceptions::invalid_request_exception(sprint("User %s doesn't exist", *_username));
}
@@ -84,7 +84,7 @@ future<> cql3::statements::list_permissions_statement::check_access(const servic
future<::shared_ptr<cql_transport::messages::result_message>>
cql3::statements::list_permissions_statement::execute(distributed<service::storage_proxy>& proxy, service::query_state& state, const query_options& options) {
static auto make_column = [](sstring name) {
return ::make_shared<column_specification>(auth::auth::AUTH_KS, "permissions", ::make_shared<column_identifier>(std::move(name), true), utf8_type);
return ::make_shared<column_specification>(auth::meta::AUTH_KS, "permissions", ::make_shared<column_identifier>(std::move(name), true), utf8_type);
};
static thread_local const std::vector<::shared_ptr<column_specification>> metadata({
make_column("username"), make_column("resource"), make_column("permission")
@@ -104,7 +104,8 @@ cql3::statements::list_permissions_statement::execute(distributed<service::stora
}
return map_reduce(resources, [&state, this](opt_resource r) {
return auth::authorizer::get().list(state.get_client_state().user(), _permissions, std::move(r), _username);
auto& auth_service = *state.get_client_state().get_auth_service();
return auth_service.underlying_authorizer().list(auth_service, state.get_client_state().user(), _permissions, std::move(r), _username);
}, std::vector<auth::permission_details>(), [](std::vector<auth::permission_details> details, std::vector<auth::permission_details> pd) {
details.insert(details.end(), pd.begin(), pd.end());
return std::move(details);

View File

@@ -42,7 +42,7 @@
#include "list_users_statement.hh"
#include "cql3/query_processor.hh"
#include "cql3/query_options.hh"
#include "auth/auth.hh"
#include "auth/common.hh"
void cql3::statements::list_users_statement::validate(distributed<service::storage_proxy>& proxy, const service::client_state& state) {
}
@@ -57,7 +57,7 @@ cql3::statements::list_users_statement::execute(distributed<service::storage_pro
auto is = std::make_unique<service::query_state>(service::client_state::for_internal_calls());
auto io = std::make_unique<query_options>(db::consistency_level::QUORUM, std::vector<cql3::raw_value>{});
auto f = get_local_query_processor().process(
sprint("SELECT * FROM %s.%s", auth::auth::AUTH_KS,
auth::auth::USERS_CF), *is, *io);
sprint("SELECT * FROM %s.%s", auth::meta::AUTH_KS,
auth::meta::USERS_CF), *is, *io);
return f.finally([is = std::move(is), io = std::move(io)] {});
}

View File

@@ -41,11 +41,11 @@
#include <seastar/core/thread.hh>
#include "auth/service.hh"
#include "permission_altering_statement.hh"
#include "cql3/query_processor.hh"
#include "cql3/query_options.hh"
#include "cql3/selection/selection.hh"
#include "auth/auth.hh"
cql3::statements::permission_altering_statement::permission_altering_statement(
auth::permission_set permissions, auth::data_resource resource,
@@ -60,7 +60,7 @@ void cql3::statements::permission_altering_statement::validate(distributed<servi
}
future<> cql3::statements::permission_altering_statement::check_access(const service::client_state& state) {
return auth::auth::is_existing_user(_username).then([this, &state](bool exists) {
return state.get_auth_service()->is_existing_user(_username).then([this, &state](bool exists) {
if (!exists) {
throw exceptions::invalid_request_exception(sprint("User %s doesn't exist", _username));
}

View File

@@ -44,7 +44,10 @@
future<::shared_ptr<cql_transport::messages::result_message>>
cql3::statements::revoke_statement::execute(distributed<service::storage_proxy>& proxy, service::query_state& state, const query_options& options) {
return auth::authorizer::get().revoke(state.get_client_state().user(), _permissions, _resource, _username).then([] {
auto& client_state = state.get_client_state();
auto& auth_service = *client_state.get_auth_service();
return auth_service.underlying_authorizer().revoke(client_state.user(), _permissions, _resource, _username).then([] {
return make_ready_future<::shared_ptr<cql_transport::messages::result_message>>();
});
}

View File

@@ -49,13 +49,12 @@ void cql3::user_options::put(const sstring& name, const sstring& value) {
_options[auth::authenticator::string_to_option(name)] = value;
}
void cql3::user_options::validate() const {
auto& a = auth::authenticator::get();
void cql3::user_options::validate(const auth::authenticator& a) const {
for (auto o : _options | boost::adaptors::map_keys) {
if (!a.supported_options().contains(o)) {
throw exceptions::invalid_request_exception(
sprint("%s doesn't support %s option",
a.class_name(),
a.qualified_java_name(),
a.option_to_string(o)));
}
}

View File

@@ -42,6 +42,10 @@
#include "auth/authenticator.hh"
namespace auth {
class authenticator;
}
namespace cql3 {
class user_options {
@@ -56,7 +60,7 @@ public:
const auth::authenticator::option_map& options() const {
return _options;
}
void validate() const;
void validate(const auth::authenticator&) const;
};
}

107
delayed_tasks.hh Normal file
View File

@@ -0,0 +1,107 @@
/*
* Copyright (C) 2017 ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include <algorithm>
#include <chrono>
#include <memory>
#include <list>
#include <stdexcept>
#include <seastar/core/future.hh>
#include <seastar/core/timer.hh>
#include "log.hh"
#include "seastarx.hh"
//
// Delay asynchronous tasks.
//
template <class Clock = std::chrono::steady_clock>
class delayed_tasks final {
static logging::logger _logger;
// A waiter is destroyed before the timer has elapsed.
class cancelled : public std::exception {
public:
};
class waiter final {
timer<Clock> _timer;
promise<> _done{};
public:
explicit waiter(typename Clock::duration d) : _timer([this] { _done.set_value(); }) {
_timer.arm(d);
}
~waiter() {
if (_timer.armed()) {
_timer.cancel();
_done.set_exception(cancelled());
}
}
future<> get_future() noexcept {
return _done.get_future();
}
};
// `std::list` because iterators are not invalidated. We assume that look-up time is not a bottleneck.
std::list<std::unique_ptr<waiter>> _waiters{};
public:
//
// Schedule the task `f` after d` has elapsed. If the instance goes out of scope before
// the duration has elapsed, then the task is cancelled.
//
template <class Rep, class Period, class Task>
void schedule_after(std::chrono::duration<Rep, Period> d, Task&& f) {
_logger.trace("Adding scheduled task.");
auto iter = _waiters.insert(_waiters.end(), std::make_unique<waiter>(d));
auto& w = *iter;
w->get_future().then([this, f = std::move(f)] {
_logger.trace("Running scheduled task.");
return f();
}).then([this, iter] {
// We'll only get here if the instance is still alive, since otherwise the future will be resolved to
// `cancelled`.
_waiters.erase(iter);
}).template handle_exception_type([](const cancelled&) {
// Nothing.
return make_ready_future<>();
});
}
//
// Cancel all scheduled tasks.
//
void cancel_all() {
_waiters.clear();
}
};
template <class Clock>
logging::logger delayed_tasks<Clock>::_logger("delayed_tasks");

View File

@@ -34,8 +34,8 @@ logging::logger startlog("init");
// duplicated in cql_test_env.cc
// until proper shutdown is done.
void init_storage_service(distributed<database>& db) {
service::init_storage_service(db).get();
void init_storage_service(distributed<database>& db, sharded<auth::service>& auth_service) {
service::init_storage_service(db, auth_service).get();
// #293 - do not stop anything
//engine().at_exit([] { return service::deinit_storage_service(); });
}

View File

@@ -23,6 +23,7 @@
#include <seastar/core/sstring.hh>
#include <seastar/core/future.hh>
#include <seastar/core/distributed.hh>
#include "auth/service.hh"
#include "db/config.hh"
#include "database.hh"
#include "log.hh"
@@ -31,7 +32,7 @@ extern logging::logger startlog;
class bad_configuration_error : public std::exception {};
void init_storage_service(distributed<database>& db);
void init_storage_service(distributed<database>& db, sharded<auth::service>&);
void init_ms_fd_gossiper(sstring listen_address
, uint16_t storage_port
, uint16_t ssl_storage_port

View File

@@ -436,8 +436,9 @@ int main(int ac, char** av) {
api::set_server_init(ctx).get();
ctx.http_server.listen(ipv4_addr{ip, api_port}).get();
startlog.info("Scylla API server listening on {}:{} ...", api_address, api_port);
static sharded<auth::service> auth_service;
supervisor::notify("initializing storage service");
init_storage_service(db);
init_storage_service(db, auth_service);
supervisor::notify("starting per-shard database core");
// Note: changed from using a move here, because we want the config object intact.
db.start(std::ref(*cfg)).get();

View File

@@ -40,9 +40,9 @@
*/
#include "client_state.hh"
#include "auth/auth.hh"
#include "auth/authorizer.hh"
#include "auth/authenticator.hh"
#include "auth/common.hh"
#include "exceptions/exceptions.hh"
#include "validation.hh"
#include "db/system_keyspace.hh"
@@ -61,7 +61,7 @@ future<> service::client_state::check_user_exists() {
return make_ready_future();
}
return auth::auth::is_existing_user(_user->name()).then([user = _user](bool exists) mutable {
return _auth_service->is_existing_user(_user->name()).then([user = _user](bool exists) mutable {
if (!exists) {
throw exceptions::authentication_exception(
sprint("User %s doesn't exist - create it with CREATE USER query first",
@@ -138,7 +138,7 @@ future<> service::client_state::has_access(const sstring& ks, auth::permission p
}
// we want to allow altering AUTH_KS and TRACING_KS.
for (auto& n : { auth::auth::AUTH_KS, tracing::trace_keyspace_helper::KEYSPACE_NAME }) {
for (auto& n : { auth::meta::AUTH_KS, tracing::trace_keyspace_helper::KEYSPACE_NAME }) {
if (name == n && p == auth::permission::DROP) {
throw exceptions::unauthorized_exception(sprint("Cannot %s %s", auth::permissions::to_string(p), resource));
}
@@ -160,8 +160,8 @@ future<> service::client_state::has_access(const sstring& ks, auth::permission p
return make_ready_future();
}
if (auth::permissions::ALTERATIONS.contains(p)) {
for (auto& s : { auth::authorizer::get().protected_resources(),
auth::authenticator::get().protected_resources() }) {
for (auto& s : { _auth_service->underlying_authorizer().protected_resources(),
_auth_service->underlying_authorizer().protected_resources() }) {
if (s.count(resource)) {
throw exceptions::unauthorized_exception(
sprint("%s schema is protected",
@@ -174,12 +174,16 @@ future<> service::client_state::has_access(const sstring& ks, auth::permission p
}
future<bool> service::client_state::check_has_permission(auth::permission p, auth::data_resource resource) const {
if (_is_internal) {
return make_ready_future<bool>(true);
}
std::experimental::optional<auth::data_resource> parent;
if (resource.has_parent()) {
parent = resource.get_parent();
}
return auth::auth::get_permissions(_user, resource).then([this, p, parent = std::move(parent)](auth::permission_set set) {
return _auth_service->get_permissions(_user, resource).then([this, p, parent = std::move(parent)](auth::permission_set set) {
if (set.contains(p)) {
return make_ready_future<bool>(true);
}

View File

@@ -41,6 +41,7 @@
#pragma once
#include "auth/service.hh"
#include "exceptions/exceptions.hh"
#include "unimplemented.hh"
#include "timestamp.hh"
@@ -99,6 +100,8 @@ private:
// Address of a client
socket_address _remote_address;
// Only populated for external client state.
auth::service* _auth_service{nullptr};
public:
struct internal_tag {};
struct external_tag {};
@@ -122,11 +125,12 @@ public:
return _trace_state_ptr;
}
client_state(external_tag, const socket_address& remote_address = socket_address(), bool thrift = false)
client_state(external_tag, auth::service& auth_service, const socket_address& remote_address = socket_address(), bool thrift = false)
: _is_internal(false)
, _is_thrift(thrift)
, _remote_address(remote_address) {
if (!auth::authenticator::get().require_authentication()) {
, _remote_address(remote_address)
, _auth_service(&auth_service) {
if (!auth_service.underlying_authenticator().require_authentication()) {
_user = ::make_shared<auth::authenticated_user>();
}
}
@@ -137,6 +141,16 @@ public:
client_state(internal_tag) : _keyspace("system"), _is_internal(true), _is_thrift(false) {}
// `nullptr` for internal instances.
auth::service* get_auth_service() {
return _auth_service;
}
// See above.
const auth::service* get_auth_service() const {
return _auth_service;
}
void merge(const client_state& other);
bool is_thrift() const {
@@ -155,13 +169,15 @@ public:
}
/**
* The `auth::service` should be non-`nullptr` for native protocol users.
*
* @return a ClientState object for external clients (thrift/native protocol users).
*/
static client_state for_external_calls() {
return client_state(external_tag());
static client_state for_external_calls(auth::service& ser) {
return client_state(external_tag(), ser);
}
static client_state for_external_thrift_calls() {
return client_state(external_tag(), socket_address(), true);
static client_state for_external_thrift_calls(auth::service& ser) {
return client_state(external_tag(), ser, socket_address(), true);
}
/**
@@ -288,3 +304,4 @@ public:
};
}

View File

@@ -65,7 +65,6 @@
#include <seastar/core/rwlock.hh>
#include "db/batchlog_manager.hh"
#include "db/commitlog/commitlog.hh"
#include "auth/auth.hh"
#include <seastar/net/tls.hh>
#include <seastar/net/dns.hh>
#include "utils/exceptions.hh"
@@ -100,8 +99,9 @@ int get_generation_number() {
return generation_number;
}
storage_service::storage_service(distributed<database>& db)
storage_service::storage_service(distributed<database>& db, sharded<auth::service>& auth_service)
: _db(db)
, _auth_service(auth_service)
, _replicate_action([this] { return do_replicate_to_all_cores(); }) {
sstable_read_error.connect([this] { isolate_on_error(); });
sstable_write_error.connect([this] { isolate_on_error(); });
@@ -521,7 +521,15 @@ void storage_service::join_token_ring(int delay) {
slogger.error("{}", err);
throw std::runtime_error(err);
}
auth::auth::setup().get();
_auth_service.start(
auth::permissions_cache_config::from_db_config(_db.local().get_config()),
std::ref(cql3::get_query_processor()),
std::ref(service::get_migration_manager()),
auth::service_config::from_db_config(_db.local().get_config())).get();
_auth_service.invoke_on_all(&auth::service::start).get();
supervisor::notify("starting tracing");
tracing::tracing::start_tracing().get();
} else {
@@ -546,7 +554,14 @@ future<> storage_service::join_ring() {
slogger.error("{}", err);
throw std::runtime_error(err);
}
auth::auth::setup().get();
ss._auth_service.start(
auth::permissions_cache_config::from_db_config(ss._db.local().get_config()),
std::ref(cql3::get_query_processor()),
std::ref(service::get_migration_manager()),
auth::service_config::from_db_config(ss._db.local().get_config())).get();
ss._auth_service.invoke_on_all(&auth::service::start).get();
}
});
});
@@ -1178,7 +1193,7 @@ future<> storage_service::stop_transport() {
ss.do_stop_stream_manager().get();
slogger.info("Stop transport: shutdown stream_manager done");
auth::auth::shutdown().get();
ss._auth_service.stop().get();
slogger.info("Stop transport: auth shutdown");
slogger.info("Stop transport: done");
@@ -1918,7 +1933,7 @@ future<> storage_service::start_rpc_server() {
auto addr = cfg.rpc_address();
auto keepalive = cfg.rpc_keepalive();
return seastar::net::dns::resolve_name(addr).then([&ss, tserver, addr, port, keepalive] (seastar::net::inet_address ip) {
return tserver->start(std::ref(ss._db), std::ref(cql3::get_query_processor())).then([tserver, port, addr, ip, keepalive] {
return tserver->start(std::ref(ss._db), std::ref(cql3::get_query_processor()), std::ref(ss._auth_service)).then([tserver, port, addr, ip, keepalive] {
// #293 - do not stop anything
//engine().at_exit([tserver] {
// return tserver->stop();
@@ -1969,8 +1984,8 @@ future<> storage_service::start_native_transport() {
auto ceo = cfg.client_encryption_options();
auto keepalive = cfg.rpc_keepalive();
cql_transport::cql_load_balance lb = cql_transport::parse_load_balance(cfg.load_balance());
return seastar::net::dns::resolve_name(addr).then([cserver, addr, &cfg, lb, keepalive, ceo = std::move(ceo)] (seastar::net::inet_address ip) {
return cserver->start(std::ref(service::get_storage_proxy()), std::ref(cql3::get_query_processor()), lb).then([cserver, &cfg, addr, ip, ceo, keepalive]() {
return seastar::net::dns::resolve_name(addr).then([&ss, cserver, addr, &cfg, lb, keepalive, ceo = std::move(ceo)] (seastar::net::inet_address ip) {
return cserver->start(std::ref(service::get_storage_proxy()), std::ref(cql3::get_query_processor()), lb, std::ref(ss._auth_service)).then([cserver, &cfg, addr, ip, ceo, keepalive]() {
// #293 - do not stop anything
//engine().at_exit([cserver] {
// return cserver->stop();

View File

@@ -39,6 +39,7 @@
#pragma once
#include "auth/service.hh"
#include "gms/i_endpoint_state_change_subscriber.hh"
#include "service/endpoint_lifecycle_subscriber.hh"
#include "locator/token_metadata.hh"
@@ -113,6 +114,7 @@ private:
private final AtomicLong notificationSerialNumber = new AtomicLong();
#endif
distributed<database>& _db;
sharded<auth::service>& _auth_service;
int _update_jobs{0};
// Note that this is obviously only valid for the current shard. Users of
// this facility should elect a shard to be the coordinator based on any
@@ -129,7 +131,7 @@ private:
bool _ms_stopped = false;
bool _stream_manager_stopped = false;
public:
storage_service(distributed<database>& db);
storage_service(distributed<database>& db, sharded<auth::service>&);
void isolate_on_error();
void isolate_on_commit_error();
@@ -2243,8 +2245,8 @@ public:
}
};
inline future<> init_storage_service(distributed<database>& db) {
return service::get_storage_service().start(std::ref(db));
inline future<> init_storage_service(distributed<database>& db, sharded<auth::service>& auth_service) {
return service::get_storage_service().start(std::ref(db), std::ref(auth_service));
}
inline future<> deinit_storage_service() {

View File

@@ -34,10 +34,11 @@
#include "tests/cql_test_env.hh"
#include "tests/cql_assertions.hh"
#include "auth/auth.hh"
#include "auth/allow_all_authenticator.hh"
#include "auth/data_resource.hh"
#include "auth/authenticator.hh"
#include "auth/password_authenticator.hh"
#include "auth/service.hh"
#include "auth/authenticated_user.hh"
#include "db/config.hh"
@@ -73,55 +74,59 @@ SEASTAR_TEST_CASE(test_data_resource) {
}
SEASTAR_TEST_CASE(test_default_authenticator) {
return do_with_cql_env([](cql_test_env&) {
BOOST_REQUIRE_EQUAL(auth::authenticator::get().require_authentication(), false);
BOOST_REQUIRE_EQUAL(auth::authenticator::get().class_name(), auth::authenticator::ALLOW_ALL_AUTHENTICATOR_NAME);
return do_with_cql_env([](cql_test_env& env) {
auto& a = env.local_auth_service().underlying_authenticator();
BOOST_REQUIRE_EQUAL(a.require_authentication(), false);
BOOST_REQUIRE_EQUAL(a.qualified_java_name(), auth::allow_all_authenticator_name());
return make_ready_future();
});
}
SEASTAR_TEST_CASE(test_password_authenticator_attributes) {
db::config cfg;
cfg.authenticator(auth::password_authenticator::PASSWORD_AUTHENTICATOR_NAME);
cfg.authenticator(auth::password_authenticator_name());
return do_with_cql_env([](cql_test_env&) {
BOOST_REQUIRE_EQUAL(auth::authenticator::get().require_authentication(), true);
BOOST_REQUIRE_EQUAL(auth::authenticator::get().class_name(), auth::password_authenticator::PASSWORD_AUTHENTICATOR_NAME);
return do_with_cql_env([](cql_test_env& env) {
auto& a = env.local_auth_service().underlying_authenticator();
BOOST_REQUIRE_EQUAL(a.require_authentication(), true);
BOOST_REQUIRE_EQUAL(a.qualified_java_name(), auth::password_authenticator_name());
return make_ready_future();
}, cfg);
}
SEASTAR_TEST_CASE(test_auth_users) {
db::config cfg;
cfg.authenticator(auth::password_authenticator::PASSWORD_AUTHENTICATOR_NAME);
cfg.authenticator(auth::password_authenticator_name());
return do_with_cql_env([](cql_test_env& env) {
return seastar::async([&env] {
auto& auth = env.local_auth_service();
return do_with_cql_env([](cql_test_env&) {
return seastar::async([] {
sstring username("fisk");
auth::auth::insert_user(username, false).get();
BOOST_REQUIRE_EQUAL(auth::auth::is_existing_user(username).get0(), true);
BOOST_REQUIRE_EQUAL(auth::auth::is_super_user(username).get0(), false);
auth.insert_user(username, false).get();
BOOST_REQUIRE_EQUAL(auth.is_existing_user(username).get0(), true);
BOOST_REQUIRE_EQUAL(auth.is_super_user(username).get0(), false);
auth::auth::insert_user(username, true).get();
BOOST_REQUIRE_EQUAL(auth::auth::is_existing_user(username).get0(), true);
BOOST_REQUIRE_EQUAL(auth::auth::is_super_user(username).get0(), true);
auth.insert_user(username, true).get();
BOOST_REQUIRE_EQUAL(auth.is_existing_user(username).get0(), true);
BOOST_REQUIRE_EQUAL(auth.is_super_user(username).get0(), true);
auth::auth::delete_user(username).get();
BOOST_REQUIRE_EQUAL(auth::auth::is_existing_user(username).get0(), false);
BOOST_REQUIRE_EQUAL(auth::auth::is_super_user(username).get0(), false);
auth.delete_user(username).get();
BOOST_REQUIRE_EQUAL(auth.is_existing_user(username).get0(), false);
BOOST_REQUIRE_EQUAL(auth.is_super_user(username).get0(), false);
});
}, cfg);
}
SEASTAR_TEST_CASE(test_password_authenticator_operations) {
db::config cfg;
cfg.authenticator(auth::password_authenticator::PASSWORD_AUTHENTICATOR_NAME);
cfg.authenticator(auth::password_authenticator_name());
/**
* Not using seastar::async due to apparent ASan bug.
* Enjoy the slightly less readable code.
*/
return do_with_cql_env([](cql_test_env&) {
return do_with_cql_env([](cql_test_env& env) {
sstring username("fisk");
sstring password("notter");
@@ -132,25 +137,26 @@ SEASTAR_TEST_CASE(test_password_authenticator_operations) {
auto USERNAME_KEY = authenticator::USERNAME_KEY;
auto PASSWORD_KEY = authenticator::PASSWORD_KEY;
auto& a = env.local_auth_service().underlying_authenticator();
// check non-existing user
return authenticator::get().authenticate({ { USERNAME_KEY, username }, { PASSWORD_KEY, password } }).then_wrapped([](future<user_ptr>&& f) {
return a.authenticate({ { USERNAME_KEY, username }, { PASSWORD_KEY, password } }).then_wrapped([&a](future<user_ptr>&& f) {
try {
f.get();
BOOST_FAIL("should not reach");
} catch (exceptions::authentication_exception&) {
// ok
}
}).then([=] {
return authenticator::get().create(username, { { option::PASSWORD, password} }).then([=] {
return authenticator::get().authenticate({ { USERNAME_KEY, username }, { PASSWORD_KEY, password } }).then([=](user_ptr user) {
}).then([=, &a] {
return a.create(username, { { option::PASSWORD, password} }).then([=, &a] {
return a.authenticate({ { USERNAME_KEY, username }, { PASSWORD_KEY, password } }).then([=](user_ptr user) {
BOOST_REQUIRE_EQUAL(user->name(), username);
BOOST_REQUIRE_EQUAL(user->is_anonymous(), false);
});
});
}).then([=] {
}).then([=, &a] {
// check wrong password
return authenticator::get().authenticate( { {USERNAME_KEY, username}, {PASSWORD_KEY, "hejkotte"}}).then_wrapped([](future<user_ptr>&& f) {
return a.authenticate( { {USERNAME_KEY, username}, {PASSWORD_KEY, "hejkotte"}}).then_wrapped([](future<user_ptr>&& f) {
try {
f.get();
BOOST_FAIL("should not reach");
@@ -158,9 +164,9 @@ SEASTAR_TEST_CASE(test_password_authenticator_operations) {
// ok
}
});
}).then([=] {
}).then([=, &a] {
// sasl
auto sasl = authenticator::get().new_sasl_challenge();
auto sasl = a.new_sasl_challenge();
BOOST_REQUIRE_EQUAL(sasl->is_complete(), false);
@@ -178,10 +184,10 @@ SEASTAR_TEST_CASE(test_password_authenticator_operations) {
BOOST_REQUIRE_EQUAL(user->name(), username);
BOOST_REQUIRE_EQUAL(user->is_anonymous(), false);
});
}).then([=] {
}).then([=, &a] {
// check deleted user
return authenticator::get().drop(username).then([=] {
return authenticator::get().authenticate({ { USERNAME_KEY, username }, { PASSWORD_KEY, password } }).then_wrapped([](future<user_ptr>&& f) {
return a.drop(username).then([=, &a] {
return a.authenticate({ { USERNAME_KEY, username }, { PASSWORD_KEY, password } }).then_wrapped([](future<user_ptr>&& f) {
try {
f.get();
BOOST_FAIL("should not reach");
@@ -197,7 +203,7 @@ SEASTAR_TEST_CASE(test_password_authenticator_operations) {
SEASTAR_TEST_CASE(test_cassandra_hash) {
db::config cfg;
cfg.authenticator(auth::password_authenticator::PASSWORD_AUTHENTICATOR_NAME);
cfg.authenticator(auth::password_authenticator_name());
return do_with_cql_env([](cql_test_env& env) {
/**
@@ -215,8 +221,8 @@ SEASTAR_TEST_CASE(test_cassandra_hash) {
auto f = env.local_qp().process("INSERT into system_auth.credentials (username, salted_hash) values (?, ?)", db::consistency_level::ONE,
{ username, salted_hash }).discard_result();
return f.then([=] {
auto& a = auth::authenticator::get();
return f.then([=, &env] {
auto& a = env.local_auth_service().underlying_authenticator();
auto USERNAME_KEY = auth::authenticator::USERNAME_KEY;
auto PASSWORD_KEY = auth::authenticator::PASSWORD_KEY;

View File

@@ -43,7 +43,7 @@
#include "gms/failure_detector.hh"
#include "gms/gossiper.hh"
#include "service/storage_service.hh"
#include "auth/auth.hh"
#include "auth/service.hh"
namespace sstables {
@@ -82,13 +82,14 @@ public:
static std::atomic<bool> active;
private:
::shared_ptr<distributed<database>> _db;
::shared_ptr<sharded<auth::service>> _auth_service;
lw_shared_ptr<tmpdir> _data_dir;
private:
struct core_local_state {
service::client_state client_state;
core_local_state()
: client_state(service::client_state::for_external_calls())
core_local_state(auth::service& auth_service)
: client_state(service::client_state::for_external_calls(auth_service))
{
client_state.set_login(::make_shared<auth::authenticated_user>("cassandra"));
}
@@ -106,7 +107,7 @@ private:
return ::make_shared<service::query_state>(_core_local.local().client_state);
}
public:
single_node_cql_env(::shared_ptr<distributed<database>> db) : _db(db)
single_node_cql_env(::shared_ptr<distributed<database>> db, ::shared_ptr<sharded<auth::service>> auth_service) : _db(db), _auth_service(std::move(auth_service))
{ }
virtual future<::shared_ptr<cql_transport::messages::result_message>> execute_cql(const sstring& text) override {
@@ -241,8 +242,12 @@ public:
return cql3::get_query_processor();
}
auth::service& local_auth_service() override {
return _auth_service->local();
}
future<> start() {
return _core_local.start();
return _core_local.start(std::ref(*_auth_service));
}
future<> stop() {
@@ -292,8 +297,10 @@ public:
ms.start(listen, std::move(7000)).get();
auto stop_ms = defer([&ms] { ms.stop().get(); });
auto auth_service = ::make_shared<sharded<auth::service>>();
auto& ss = service::get_storage_service();
ss.start(std::ref(*db)).get();
ss.start(std::ref(*db), std::ref(*auth_service)).get();
auto stop_storage_service = defer([&ss] { ss.stop().get(); });
db->start(std::move(*cfg)).get();
@@ -347,12 +354,12 @@ public:
auto stop_local_cache = defer([] { db::system_keyspace::deinit_local_cache().get(); });
service::get_local_storage_service().init_server().get();
auto deinit_storage_service_server = defer([] {
auto deinit_storage_service_server = defer([auth_service] {
gms::stop_gossiping().get();
auth::auth::shutdown().get();
auth_service->stop().get();
});
single_node_cql_env env(db);
single_node_cql_env env(db, auth_service);
env.start().get();
auto stop_env = defer([&env] { env.stop().get(); });

View File

@@ -38,6 +38,10 @@
class database;
namespace auth {
class service;
}
namespace cql3 {
class query_processor;
}
@@ -87,6 +91,8 @@ public:
virtual distributed<database>& db() = 0;
virtual distributed<cql3::query_processor> & qp() = 0;
virtual auth::service& local_auth_service() = 0;
};
future<> do_with_cql_env(std::function<future<>(cql_test_env&)> func);

View File

@@ -61,19 +61,20 @@ namespace bpo = boost::program_options;
int main(int ac, char ** av) {
distributed<database> db;
sharded<auth::service> auth_service;
app_template app;
app.add_options()
("seed", bpo::value<std::vector<std::string>>(), "IP address of seed node")
("listen-address", bpo::value<std::string>()->default_value("0.0.0.0"), "IP address to listen");
return app.run_deprecated(ac, av, [&db, &app] {
return app.run_deprecated(ac, av, [&auth_service, &db, &app] {
auto config = app.configuration();
logging::logger_registry().set_logger_level("gossip", logging::log_level::trace);
const gms::inet_address listen = gms::inet_address(config["listen-address"].as<std::string>());
utils::fb_utilities::set_broadcast_address(listen);
utils::fb_utilities::set_broadcast_rpc_address(listen);
auto vv = std::make_shared<gms::versioned_value::factory>();
locator::i_endpoint_snitch::create_snitch("SimpleSnitch").then([&db] {
return service::init_storage_service(db);
locator::i_endpoint_snitch::create_snitch("SimpleSnitch").then([&auth_service, &db] {
return service::init_storage_service(db, auth_service);
}).then([vv, listen, config] {
return netw::get_messaging_service().start(listen);
}).then([config] {

View File

@@ -40,9 +40,10 @@ thread_local disk_error_signal_type general_disk_error;
SEASTAR_TEST_CASE(test_boot_shutdown){
return seastar::async([] {
distributed<database> db;
sharded<auth::service> auth_service;
utils::fb_utilities::set_broadcast_address(gms::inet_address("127.0.0.1"));
locator::i_endpoint_snitch::create_snitch("SimpleSnitch").get();
service::get_storage_service().start(std::ref(db)).get();
service::get_storage_service().start(std::ref(db), std::ref(auth_service)).get();
db.start().get();
netw::get_messaging_service().start(gms::inet_address("127.0.0.1")).get();
gms::get_failure_detector().start().get();

View File

@@ -24,17 +24,19 @@
#include <seastar/core/distributed.hh>
#include <seastar/core/thread.hh>
#include "auth/service.hh"
#include "service/storage_service.hh"
#include "message/messaging_service.hh"
class storage_service_for_tests {
distributed<database> _db;
sharded<auth::service> _auth_service;
public:
storage_service_for_tests() {
auto thread = seastar::thread_impl::get();
assert(thread);
netw::get_messaging_service().start(gms::inet_address("127.0.0.1")).get();
service::get_storage_service().start(std::ref(_db)).get();
service::get_storage_service().start(std::ref(_db), std::ref(_auth_service)).get();
service::get_storage_service().invoke_on_all([] (auto& ss) {
ss.enable_all_features();
}).get();

View File

@@ -198,10 +198,10 @@ private:
});
}
public:
explicit thrift_handler(distributed<database>& db, distributed<cql3::query_processor>& qp)
explicit thrift_handler(distributed<database>& db, distributed<cql3::query_processor>& qp, auth::service& auth_service)
: _db(db)
, _query_processor(qp)
, _query_state(service::client_state::for_external_thrift_calls())
, _query_state(service::client_state::for_external_thrift_calls(auth_service))
{ }
const sstring& current_keyspace() const {
@@ -215,7 +215,8 @@ public:
void login(tcxx::function<void()> cob, tcxx::function<void(::apache::thrift::TDelayedException* _throw)> exn_cob, const AuthenticationRequest& auth_request) {
with_cob(std::move(cob), std::move(exn_cob), [&] {
auth::authenticator::credentials_map creds(auth_request.credentials.begin(), auth_request.credentials.end());
return auth::authenticator::get().authenticate(creds).then([this] (auto user) {
auto& auth_service = *_query_state.get_client_state().get_auth_service();
return auth_service.underlying_authenticator().authenticate(creds).then([this] (auto user) {
_query_state.get_client_state().set_login(std::move(user));
});
});
@@ -1902,13 +1903,15 @@ private:
class handler_factory : public CassandraCobSvIfFactory {
distributed<database>& _db;
distributed<cql3::query_processor>& _query_processor;
auth::service& _auth_service;
public:
explicit handler_factory(distributed<database>& db,
distributed<cql3::query_processor>& qp)
: _db(db), _query_processor(qp) {}
distributed<cql3::query_processor>& qp,
auth::service& auth_service)
: _db(db), _query_processor(qp), _auth_service(auth_service) {}
typedef CassandraCobSvIf Handler;
virtual CassandraCobSvIf* getHandler(const ::apache::thrift::TConnectionInfo& connInfo) {
return new thrift_handler(_db, _query_processor);
return new thrift_handler(_db, _query_processor, _auth_service);
}
virtual void releaseHandler(CassandraCobSvIf* handler) {
delete handler;
@@ -1916,6 +1919,6 @@ public:
};
std::unique_ptr<CassandraCobSvIfFactory>
create_handler_factory(distributed<database>& db, distributed<cql3::query_processor>& qp) {
return std::make_unique<handler_factory>(db, qp);
create_handler_factory(distributed<database>& db, distributed<cql3::query_processor>& qp, auth::service& auth_service) {
return std::make_unique<handler_factory>(db, qp, auth_service);
}

View File

@@ -23,11 +23,12 @@
#define APPS_SEASTAR_THRIFT_HANDLER_HH_
#include "Cassandra.h"
#include "auth/service.hh"
#include "database.hh"
#include "core/distributed.hh"
#include "cql3/query_processor.hh"
#include <memory>
std::unique_ptr<::cassandra::CassandraCobSvIfFactory> create_handler_factory(distributed<database>& db, distributed<cql3::query_processor>& qp);
std::unique_ptr<::cassandra::CassandraCobSvIfFactory> create_handler_factory(distributed<database>& db, distributed<cql3::query_processor>& qp, auth::service&);
#endif /* APPS_SEASTAR_THRIFT_HANDLER_HH_ */

View File

@@ -59,9 +59,10 @@ public:
};
thrift_server::thrift_server(distributed<database>& db,
distributed<cql3::query_processor>& qp)
distributed<cql3::query_processor>& qp,
auth::service& auth_service)
: _stats(new thrift_stats(*this))
, _handler_factory(create_handler_factory(db, qp).release())
, _handler_factory(create_handler_factory(db, qp, auth_service).release())
, _protocol_factory(new TBinaryProtocolFactoryT<TMemoryBuffer>())
, _processor_factory(new CassandraAsyncProcessorFactory(_handler_factory)) {
}

View File

@@ -62,6 +62,10 @@ class TMemoryBuffer;
}}}
namespace auth {
class service;
}
class thrift_server {
class connection : public boost::intrusive::list_base_hook<> {
struct fake_transport;
@@ -100,7 +104,7 @@ private:
boost::intrusive::list<connection> _connections_list;
seastar::gate _stop_gate;
public:
thrift_server(distributed<database>& db, distributed<cql3::query_processor>& qp);
thrift_server(distributed<database>& db, distributed<cql3::query_processor>& qp, auth::service&);
~thrift_server();
future<> listen(ipv4_addr addr, bool keepalive);
future<> stop();

View File

@@ -64,7 +64,7 @@ struct trace_keyspace_backend_sesssion_state final : public backend_session_stat
trace_keyspace_helper::trace_keyspace_helper(tracing& tr)
: i_tracing_backend_helper(tr)
, _dummy_query_state(service::client_state(service::client_state::external_tag{}))
, _dummy_query_state(service::client_state(service::client_state::internal_tag{}))
, _sessions(KEYSPACE_NAME, SESSIONS,
sprint("CREATE TABLE IF NOT EXISTS %s.%s ("
"session_id uuid,"

View File

@@ -260,13 +260,14 @@ private:
}
};
cql_server::cql_server(distributed<service::storage_proxy>& proxy, distributed<cql3::query_processor>& qp, cql_load_balance lb)
cql_server::cql_server(distributed<service::storage_proxy>& proxy, distributed<cql3::query_processor>& qp, cql_load_balance lb, auth::service& auth_service)
: _proxy(proxy)
, _query_processor(qp)
, _max_request_size(memory::stats().total_memory() / 10)
, _memory_available(_max_request_size)
, _notifier(std::make_unique<event_notifier>())
, _lb(lb)
, _auth_service(auth_service)
{
namespace sm = seastar::metrics;
@@ -557,7 +558,7 @@ cql_server::connection::connection(cql_server& server, ipv4_addr server_addr, co
, _fd(std::move(fd))
, _read_buf(_fd.input())
, _write_buf(_fd.output())
, _client_state(service::client_state::external_tag{}, addr)
, _client_state(service::client_state::external_tag{}, server._auth_service, addr)
{
++_server._total_connections;
++_server._current_connections;
@@ -748,9 +749,9 @@ future<response_type> cql_server::connection::process_startup(uint16_t stream, b
throw exceptions::protocol_exception(sprint("Unknown compression algorithm: %s", compression));
}
}
auto& a = auth::authenticator::get();
auto& a = client_state.get_auth_service()->underlying_authenticator();
if (a.require_authentication()) {
return make_ready_future<response_type>(std::make_pair(make_autheticate(stream, a.class_name(), client_state.get_trace_state()), client_state));
return make_ready_future<response_type>(std::make_pair(make_autheticate(stream, a.qualified_java_name(), client_state.get_trace_state()), client_state));
}
return make_ready_future<response_type>(std::make_pair(make_ready(stream, client_state.get_trace_state()), client_state));
}
@@ -758,7 +759,7 @@ future<response_type> cql_server::connection::process_startup(uint16_t stream, b
future<response_type> cql_server::connection::process_auth_response(uint16_t stream, bytes_view buf, service::client_state client_state)
{
if (_sasl_challenge == nullptr) {
_sasl_challenge = auth::authenticator::get().new_sasl_challenge();
_sasl_challenge = client_state.get_auth_service()->underlying_authenticator().new_sasl_challenge();
}
auto challenge = _sasl_challenge->evaluate_response(buf);

View File

@@ -21,6 +21,7 @@
#pragma once
#include "auth/service.hh"
#include "core/reactor.hh"
#include "service/endpoint_lifecycle_subscriber.hh"
#include "service/migration_listener.hh"
@@ -118,8 +119,9 @@ private:
uint64_t _unpaged_queries = 0;
uint64_t _requests_serving = 0;
cql_load_balance _lb;
auth::service& _auth_service;
public:
cql_server(distributed<service::storage_proxy>& proxy, distributed<cql3::query_processor>& qp, cql_load_balance lb);
cql_server(distributed<service::storage_proxy>& proxy, distributed<cql3::query_processor>& qp, cql_load_balance lb, auth::service&);
future<> listen(ipv4_addr addr, std::shared_ptr<seastar::tls::credentials_builder> = {}, bool keepalive = false);
future<> do_accepts(int which, bool keepalive, ipv4_addr server_addr);
future<> stop();

View File

@@ -26,6 +26,7 @@
#include <boost/intrusive/list.hpp>
#include <boost/intrusive/unordered_set.hpp>
#include <seastar/core/reactor.hh>
#include <seastar/core/timer.hh>
#include <seastar/core/gate.hh>