Merge "auth: Replace delayed_tasks with sleep_abortable" from Duarte

"delayed_tasks has a bug that if the object is destroyed while a timer
callback is queued, the callback will then try to access freed memory.
This series replaces the whole thing with sleep_abortable()."

* 'auth-delayed-tasks/v2' of https://github.com/duarten/scylla:
  auth: Replace delayed_tasks with sleep_abortable
  utils/exponential_backoff_retry: Add helper to automate retries
  utils/exponential_backoff_retry: Add abort_source-based retry
This commit is contained in:
Avi Kivity
2018-01-01 13:44:01 +02:00
10 changed files with 95 additions and 164 deletions

View File

@@ -39,7 +39,23 @@ const sstring AUTH_PACKAGE_NAME("org.apache.cassandra.auth.");
}
logging::logger auth_log("auth");
static logging::logger auth_log("auth");
// Func must support being invoked more than once.
future<> do_after_system_ready(seastar::abort_source& as, seastar::noncopyable_function<future<>()> func) {
struct empty_state { };
return delay_until_system_ready(as).then([&as, func = std::move(func)] () mutable {
return exponential_backoff_retry::do_until_value(1s, 1min, as, [func = std::move(func)] {
return func().then_wrapped([] (auto&& f) -> stdx::optional<empty_state> {
if (f.failed()) {
auth_log.warn("Auth task failed with error, rescheduling: {}", f.get_exception());
return { };
}
return { empty_state() };
});
});
}).discard_result();
}
future<> create_metadata_table_if_missing(
const sstring& table_name,

View File

@@ -24,11 +24,12 @@
#include <chrono>
#include <seastar/core/future.hh>
#include <seastar/core/abort_source.hh>
#include <seastar/util/noncopyable_function.hh>
#include <seastar/core/reactor.hh>
#include <seastar/core/resource.hh>
#include <seastar/core/sstring.hh>
#include "delayed_tasks.hh"
#include "log.hh"
#include "seastarx.hh"
#include "utils/exponential_backoff_retry.hh"
@@ -54,8 +55,6 @@ extern const sstring AUTH_PACKAGE_NAME;
}
extern logging::logger auth_log;
template <class Task>
future<> once_among_shards(Task&& f) {
if (engine().cpu_id() == 0u) {
@@ -65,25 +64,12 @@ future<> once_among_shards(Task&& f) {
return make_ready_future<>();
}
template <class Task>
static future<> do_execute_task(Task&& t, exponential_backoff_retry r) {
auto f = t();
return f.handle_exception([t = std::move(t), r = std::move(r)] (auto ep) mutable {
auth_log.warn("Task failed with error, rescheduling: {}", ep);
auto delay = r.retry();
return delay.then([t = std::move(t), r = std::move(r)] () mutable {
return do_execute_task(std::move(t), std::move(r));
});
});
inline future<> delay_until_system_ready(seastar::abort_source& as) {
return sleep_abortable(10s, as);
}
// Task must support being invoked more than once.
template <class Task, class Clock>
void delay_until_system_ready(delayed_tasks<Clock>& ts, Task t) {
ts.schedule_after(10s, [t = std::move(t)] () mutable {
return do_execute_task(std::move(t), exponential_backoff_retry(1s, 1min));
});
}
// Func must support being invoked more than once.
future<> do_after_system_ready(seastar::abort_source& as, seastar::noncopyable_function<future<>()> func);
future<> create_metadata_table_if_missing(
const sstring& table_name,

View File

@@ -80,7 +80,8 @@ auth::password_authenticator::~password_authenticator()
auth::password_authenticator::password_authenticator(cql3::query_processor& qp, ::service::migration_manager& mm)
: _qp(qp)
, _migration_manager(mm) {
, _migration_manager(mm)
, _stopped(make_ready_future<>()) {
}
// TODO: blowfish
@@ -173,7 +174,7 @@ future<> auth::password_authenticator::start() {
_qp,
create_table,
_migration_manager).then([this] {
auth::delay_until_system_ready(_delayed, [this] {
_stopped = auth::do_after_system_ready(_as, [this] {
return has_existing_users().then([this](bool existing) {
if (!existing) {
return _qp.process(
@@ -196,7 +197,8 @@ future<> auth::password_authenticator::start() {
}
future<> auth::password_authenticator::stop() {
return make_ready_future<>();
_as.request_abort();
return _stopped.handle_exception_type([] (const sleep_aborted&) { });
}
db::consistency_level auth::password_authenticator::consistency_for_user(const sstring& username) {

View File

@@ -43,7 +43,7 @@
#include "authenticator.hh"
#include "cql3/query_processor.hh"
#include "delayed_tasks.hh"
#include <seastar/core/abort_source.hh>
namespace service {
class migration_manager;
@@ -55,10 +55,9 @@ const sstring& password_authenticator_name();
class password_authenticator : public authenticator {
cql3::query_processor& _qp;
::service::migration_manager& _migration_manager;
delayed_tasks<> _delayed{};
future<> _stopped;
seastar::abort_source _as;
public:
password_authenticator(cql3::query_processor&, ::service::migration_manager&);

View File

@@ -138,7 +138,8 @@ service::service(
, _authorizer(std::move(z))
, _authenticator(std::move(a))
, _role_manager(std::move(r))
, _migration_listener(std::make_unique<auth_migration_listener>(*_authorizer)) {
, _migration_listener(std::make_unique<auth_migration_listener>(*_authorizer))
, _stopped(make_ready_future<>()) {
}
service::service(
@@ -193,7 +194,7 @@ future<> service::create_metadata_if_missing() {
_qp,
users_table_query,
_migration_manager).then([this] {
delay_until_system_ready(_delayed, [this] {
_stopped = auth::do_after_system_ready(_as, [this] {
return has_existing_users().then([this](bool existing) {
if (!existing) {
//
@@ -212,12 +213,6 @@ future<> service::create_metadata_if_missing() {
db::consistency_level::ONE,
{ meta::DEFAULT_SUPERUSER_NAME, true }).then([](auto&&) {
log.info("Created default superuser '{}'", meta::DEFAULT_SUPERUSER_NAME);
}).handle_exception([](auto exn) {
try {
std::rethrow_exception(exn);
} catch (const exceptions::request_execution_exception&) {
log.warn("Skipped default superuser setup: some nodes were not ready");
}
}).discard_result();
}
@@ -253,10 +248,10 @@ future<> service::start() {
}
future<> service::stop() {
_delayed.cancel_all();
_as.request_abort();
return _permissions_cache->stop().then([this] {
return when_all_succeed(_role_manager->stop(), _authorizer->stop(), _authenticator->stop());
auto s = _stopped.handle_exception_type([] (const sleep_aborted&) { });
return when_all_succeed(std::move(s), _role_manager->stop(), _authorizer->stop(), _authenticator->stop());
});
}

View File

@@ -24,6 +24,7 @@
#include <experimental/string_view>
#include <memory>
#include <seastar/core/abort_source.hh>
#include <seastar/core/future.hh>
#include <seastar/core/sstring.hh>
@@ -33,7 +34,6 @@
#include "auth/permission.hh"
#include "auth/permissions_cache.hh"
#include "auth/role_manager.hh"
#include "delayed_tasks.hh"
#include "seastarx.hh"
#include "stdx.hh"
@@ -86,7 +86,8 @@ class service final {
// Only one of these should be registered, so we end up with some unused instances. Not the end of the world.
std::unique_ptr<::service::migration_listener> _migration_listener;
delayed_tasks<> _delayed{};
future<> _stopped;
seastar::abort_source _as;
public:
service(

View File

@@ -28,6 +28,7 @@
#include <boost/algorithm/string/join.hpp>
#include <seastar/core/future-util.hh>
#include <seastar/core/print.hh>
#include <seastar/core/sleep.hh>
#include <seastar/core/sstring.hh>
#include <seastar/core/thread.hh>
@@ -208,7 +209,7 @@ bool standard_role_manager::has_existing_roles() const {
future<> standard_role_manager::start() {
return once_among_shards([this] {
return this->create_metadata_tables_if_missing().then([this] {
delay_until_system_ready(_delayed, [this] {
_stopped = auth::do_after_system_ready(_as, [this] {
return seastar::async([this] {
try {
if (this->has_existing_roles()) {
@@ -224,7 +225,7 @@ future<> standard_role_manager::start() {
{meta::DEFAULT_SUPERUSER_NAME}).get();
log.info("Created default superuser role '{}'.", meta::DEFAULT_SUPERUSER_NAME);
} catch (const exceptions::unavailable_exception& e) {
log.warn("Skipped default role setup: some nodes were ready; will retry");
log.warn("Skipped default role setup: some nodes were not ready; will retry");
throw e;
}
});
@@ -233,6 +234,11 @@ future<> standard_role_manager::start() {
});
}
future<> standard_role_manager::stop() {
_as.request_abort();
return _stopped.handle_exception_type([] (const sleep_aborted&) { });
}
future<>
standard_role_manager::create(const authenticated_user& performer, stdx::string_view role_name, const role_config& c) {
static const sstring query = sprint(

View File

@@ -26,10 +26,10 @@
#include <experimental/string_view>
#include <unordered_set>
#include <seastar/core/abort_source.hh>
#include <seastar/core/future.hh>
#include <seastar/core/sstring.hh>
#include "delayed_tasks.hh"
#include "stdx.hh"
#include "seastarx.hh"
@@ -47,23 +47,22 @@ stdx::string_view standard_role_manager_name() noexcept;
class standard_role_manager final : public role_manager {
cql3::query_processor& _qp;
::service::migration_manager& _migration_manager;
delayed_tasks<> _delayed{};
future<> _stopped;
seastar::abort_source _as;
public:
standard_role_manager(cql3::query_processor& qp, ::service::migration_manager& mm)
: _qp(qp), _migration_manager(mm) {
: _qp(qp)
, _migration_manager(mm)
, _stopped(make_ready_future<>()) {
}
virtual stdx::string_view qualified_java_name() const noexcept override;
virtual future<> start() override;
virtual future<> stop() override {
return make_ready_future();
}
virtual future<> stop() override;
virtual future<>
create(const authenticated_user& performer, stdx::string_view role_name, const role_config&) override;

View File

@@ -1,107 +0,0 @@
/*
* Copyright (C) 2017 ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include <algorithm>
#include <chrono>
#include <memory>
#include <list>
#include <stdexcept>
#include <seastar/core/future.hh>
#include <seastar/core/timer.hh>
#include "log.hh"
#include "seastarx.hh"
//
// Delay asynchronous tasks.
//
template <class Clock = std::chrono::steady_clock>
class delayed_tasks final {
static logging::logger _logger;
// A waiter is destroyed before the timer has elapsed.
class cancelled : public std::exception {
public:
};
class waiter final {
timer<Clock> _timer;
promise<> _done{};
public:
explicit waiter(typename Clock::duration d) : _timer([this] { _done.set_value(); }) {
_timer.arm(d);
}
~waiter() {
if (_timer.armed()) {
_timer.cancel();
_done.set_exception(cancelled());
}
}
future<> get_future() noexcept {
return _done.get_future();
}
};
// `std::list` because iterators are not invalidated. We assume that look-up time is not a bottleneck.
std::list<std::unique_ptr<waiter>> _waiters{};
public:
//
// Schedule the task `f` after d` has elapsed. If the instance goes out of scope before
// the duration has elapsed, then the task is cancelled.
//
template <class Rep, class Period, class Task>
void schedule_after(std::chrono::duration<Rep, Period> d, Task f) {
_logger.trace("Adding scheduled task.");
auto iter = _waiters.insert(_waiters.end(), std::make_unique<waiter>(d));
auto& w = *iter;
w->get_future().then([this, f = std::move(f)] () mutable {
_logger.trace("Running scheduled task.");
return f();
}).then([this, iter] {
// We'll only get here if the instance is still alive, since otherwise the future will be resolved to
// `cancelled`.
_waiters.erase(iter);
}).template handle_exception_type([](const cancelled&) {
// Nothing.
return make_ready_future<>();
});
}
//
// Cancel all scheduled tasks.
//
void cancel_all() {
_waiters.clear();
}
};
template <class Clock>
logging::logger delayed_tasks<Clock>::_logger("delayed_tasks");

View File

@@ -22,6 +22,7 @@
#pragma once
#include "core/abort_source.hh"
#include "core/sleep.hh"
#include "seastarx.hh"
#include <chrono>
@@ -37,12 +38,12 @@ public:
, _sleep_time(_base_sleep_time)
, _max_sleep_time(max_sleep_time) {}
future<> retry() {
auto old_sleep_time = _sleep_time;
// calculating sleep time seconds for the next retry.
_sleep_time = std::min(_sleep_time * 2, _max_sleep_time);
future<> retry(abort_source& as) {
return sleep_abortable(update_sleep_time(), as);
}
return sleep(old_sleep_time);
future<> retry() {
return sleep(update_sleep_time());
}
// Return sleep time in seconds to be used for next retry.
@@ -54,4 +55,37 @@ public:
void reset() {
_sleep_time = _base_sleep_time;
}
};
private:
std::chrono::milliseconds update_sleep_time() {
// calculating sleep time seconds for the next retry.
return std::exchange(_sleep_time, std::min(_sleep_time * 2, _max_sleep_time));
}
template <typename T>
struct retry_type_helper;
template <typename T>
struct retry_type_helper<future<stdx::optional<T>>> {
using optional_type = stdx::optional<T>;
using future_type = future<optional_type>;
};
public:
template<typename Func>
static auto do_until_value(std::chrono::milliseconds base_sleep_time, std::chrono::milliseconds max_sleep_time, seastar::abort_source& as, Func f) {
using type_helper = retry_type_helper<std::result_of_t<Func()>>;
auto r = exponential_backoff_retry(base_sleep_time, max_sleep_time);
return seastar::repeat_until_value([r = std::move(r), &as, f = std::move(f)] () mutable {
return f().then([&] (auto&& opt) -> typename type_helper::future_type {
if (opt) {
return make_ready_future<typename type_helper::optional_type>(std::move(opt));
}
return r.retry(as).then([] () -> typename type_helper::optional_type {
return { };
});
});
});
}
};