From 9a602c7796c217685533e57e43d060717426df96 Mon Sep 17 00:00:00 2001 From: Duarte Nunes Date: Wed, 20 Dec 2017 22:34:22 +0100 Subject: [PATCH 1/3] utils/exponential_backoff_retry: Add abort_source-based retry Signed-off-by: Duarte Nunes --- utils/exponential_backoff_retry.hh | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/utils/exponential_backoff_retry.hh b/utils/exponential_backoff_retry.hh index 186f32027e..8046499453 100644 --- a/utils/exponential_backoff_retry.hh +++ b/utils/exponential_backoff_retry.hh @@ -22,6 +22,7 @@ #pragma once +#include "core/abort_source.hh" #include "core/sleep.hh" #include "seastarx.hh" #include @@ -37,12 +38,12 @@ public: , _sleep_time(_base_sleep_time) , _max_sleep_time(max_sleep_time) {} - future<> retry() { - auto old_sleep_time = _sleep_time; - // calculating sleep time seconds for the next retry. - _sleep_time = std::min(_sleep_time * 2, _max_sleep_time); + future<> retry(abort_source& as) { + return sleep_abortable(update_sleep_time(), as); + } - return sleep(old_sleep_time); + future<> retry() { + return sleep(update_sleep_time()); } // Return sleep time in seconds to be used for next retry. @@ -54,4 +55,11 @@ public: void reset() { _sleep_time = _base_sleep_time; } + +private: + std::chrono::milliseconds update_sleep_time() { + // calculating sleep time seconds for the next retry. + return std::exchange(_sleep_time, std::min(_sleep_time * 2, _max_sleep_time)); + } +}; }; From 40ad65666f5f509cff2bbeb6eb44ad0e8fb17c2a Mon Sep 17 00:00:00 2001 From: Duarte Nunes Date: Wed, 20 Dec 2017 22:37:47 +0100 Subject: [PATCH 2/3] utils/exponential_backoff_retry: Add helper to automate retries This patch adds the do_until_value static member function to exponential_backoff_retry, which retries the specified function until it returns an engaged optional. Signed-off-by: Duarte Nunes --- utils/exponential_backoff_retry.hh | 30 ++++++++++++++++++++++++++++-- 1 file changed, 28 insertions(+), 2 deletions(-) diff --git a/utils/exponential_backoff_retry.hh b/utils/exponential_backoff_retry.hh index 8046499453..8a63e9ebc1 100644 --- a/utils/exponential_backoff_retry.hh +++ b/utils/exponential_backoff_retry.hh @@ -61,5 +61,31 @@ private: // calculating sleep time seconds for the next retry. return std::exchange(_sleep_time, std::min(_sleep_time * 2, _max_sleep_time)); } -}; -}; + + template + struct retry_type_helper; + + template + struct retry_type_helper>> { + using optional_type = stdx::optional; + using future_type = future; + }; + +public: + template + static auto do_until_value(std::chrono::milliseconds base_sleep_time, std::chrono::milliseconds max_sleep_time, seastar::abort_source& as, Func f) { + using type_helper = retry_type_helper>; + + auto r = exponential_backoff_retry(base_sleep_time, max_sleep_time); + return seastar::repeat_until_value([r = std::move(r), &as, f = std::move(f)] () mutable { + return f().then([&] (auto&& opt) -> typename type_helper::future_type { + if (opt) { + return make_ready_future(std::move(opt)); + } + return r.retry(as).then([] () -> typename type_helper::optional_type { + return { }; + }); + }); + }); + } +}; \ No newline at end of file From 81b1455b227cf3b900f51129902e80805e3882f9 Mon Sep 17 00:00:00 2001 From: Duarte Nunes Date: Wed, 20 Dec 2017 22:40:38 +0100 Subject: [PATCH 3/3] auth: Replace delayed_tasks with sleep_abortable delayed_tasks has a bug that if the object is destroyed while a timer callback is queued, the callback will then try to access freed memory. This could be fixed by providing a stop() function that waits for pending callbacks, but we can just replace the whole thing by levering the abort_source-enabled exponential_backoff_retry. --- auth/common.cc | 18 +++++- auth/common.hh | 26 ++------ auth/password_authenticator.cc | 8 ++- auth/password_authenticator.hh | 7 +-- auth/service.cc | 17 ++---- auth/service.hh | 5 +- auth/standard_role_manager.cc | 10 ++- auth/standard_role_manager.hh | 15 +++-- delayed_tasks.hh | 107 --------------------------------- 9 files changed, 55 insertions(+), 158 deletions(-) delete mode 100644 delayed_tasks.hh diff --git a/auth/common.cc b/auth/common.cc index 3ee534421c..42aac3bb72 100644 --- a/auth/common.cc +++ b/auth/common.cc @@ -39,7 +39,23 @@ const sstring AUTH_PACKAGE_NAME("org.apache.cassandra.auth."); } -logging::logger auth_log("auth"); +static logging::logger auth_log("auth"); + +// Func must support being invoked more than once. +future<> do_after_system_ready(seastar::abort_source& as, seastar::noncopyable_function()> func) { + struct empty_state { }; + return delay_until_system_ready(as).then([&as, func = std::move(func)] () mutable { + return exponential_backoff_retry::do_until_value(1s, 1min, as, [func = std::move(func)] { + return func().then_wrapped([] (auto&& f) -> stdx::optional { + if (f.failed()) { + auth_log.warn("Auth task failed with error, rescheduling: {}", f.get_exception()); + return { }; + } + return { empty_state() }; + }); + }); + }).discard_result(); +} future<> create_metadata_table_if_missing( const sstring& table_name, diff --git a/auth/common.hh b/auth/common.hh index 60960700f0..547aa2d5b1 100644 --- a/auth/common.hh +++ b/auth/common.hh @@ -24,11 +24,12 @@ #include #include +#include +#include #include #include #include -#include "delayed_tasks.hh" #include "log.hh" #include "seastarx.hh" #include "utils/exponential_backoff_retry.hh" @@ -54,8 +55,6 @@ extern const sstring AUTH_PACKAGE_NAME; } -extern logging::logger auth_log; - template future<> once_among_shards(Task&& f) { if (engine().cpu_id() == 0u) { @@ -65,25 +64,12 @@ future<> once_among_shards(Task&& f) { return make_ready_future<>(); } -template -static future<> do_execute_task(Task&& t, exponential_backoff_retry r) { - auto f = t(); - return f.handle_exception([t = std::move(t), r = std::move(r)] (auto ep) mutable { - auth_log.warn("Task failed with error, rescheduling: {}", ep); - auto delay = r.retry(); - return delay.then([t = std::move(t), r = std::move(r)] () mutable { - return do_execute_task(std::move(t), std::move(r)); - }); - }); +inline future<> delay_until_system_ready(seastar::abort_source& as) { + return sleep_abortable(10s, as); } -// Task must support being invoked more than once. -template -void delay_until_system_ready(delayed_tasks& ts, Task t) { - ts.schedule_after(10s, [t = std::move(t)] () mutable { - return do_execute_task(std::move(t), exponential_backoff_retry(1s, 1min)); - }); -} +// Func must support being invoked more than once. +future<> do_after_system_ready(seastar::abort_source& as, seastar::noncopyable_function()> func); future<> create_metadata_table_if_missing( const sstring& table_name, diff --git a/auth/password_authenticator.cc b/auth/password_authenticator.cc index 9519ac7c6e..4e14e725dc 100644 --- a/auth/password_authenticator.cc +++ b/auth/password_authenticator.cc @@ -80,7 +80,8 @@ auth::password_authenticator::~password_authenticator() auth::password_authenticator::password_authenticator(cql3::query_processor& qp, ::service::migration_manager& mm) : _qp(qp) - , _migration_manager(mm) { + , _migration_manager(mm) + , _stopped(make_ready_future<>()) { } // TODO: blowfish @@ -173,7 +174,7 @@ future<> auth::password_authenticator::start() { _qp, create_table, _migration_manager).then([this] { - auth::delay_until_system_ready(_delayed, [this] { + _stopped = auth::do_after_system_ready(_as, [this] { return has_existing_users().then([this](bool existing) { if (!existing) { return _qp.process( @@ -196,7 +197,8 @@ future<> auth::password_authenticator::start() { } future<> auth::password_authenticator::stop() { - return make_ready_future<>(); + _as.request_abort(); + return _stopped.handle_exception_type([] (const sleep_aborted&) { }); } db::consistency_level auth::password_authenticator::consistency_for_user(const sstring& username) { diff --git a/auth/password_authenticator.hh b/auth/password_authenticator.hh index 7b82f4cb41..6aaa0b6d32 100644 --- a/auth/password_authenticator.hh +++ b/auth/password_authenticator.hh @@ -43,7 +43,7 @@ #include "authenticator.hh" #include "cql3/query_processor.hh" -#include "delayed_tasks.hh" +#include namespace service { class migration_manager; @@ -55,10 +55,9 @@ const sstring& password_authenticator_name(); class password_authenticator : public authenticator { cql3::query_processor& _qp; - ::service::migration_manager& _migration_manager; - - delayed_tasks<> _delayed{}; + future<> _stopped; + seastar::abort_source _as; public: password_authenticator(cql3::query_processor&, ::service::migration_manager&); diff --git a/auth/service.cc b/auth/service.cc index 10e38c3238..2d99332900 100644 --- a/auth/service.cc +++ b/auth/service.cc @@ -138,7 +138,8 @@ service::service( , _authorizer(std::move(z)) , _authenticator(std::move(a)) , _role_manager(std::move(r)) - , _migration_listener(std::make_unique(*_authorizer)) { + , _migration_listener(std::make_unique(*_authorizer)) + , _stopped(make_ready_future<>()) { } service::service( @@ -193,7 +194,7 @@ future<> service::create_metadata_if_missing() { _qp, users_table_query, _migration_manager).then([this] { - delay_until_system_ready(_delayed, [this] { + _stopped = auth::do_after_system_ready(_as, [this] { return has_existing_users().then([this](bool existing) { if (!existing) { // @@ -212,12 +213,6 @@ future<> service::create_metadata_if_missing() { db::consistency_level::ONE, { meta::DEFAULT_SUPERUSER_NAME, true }).then([](auto&&) { log.info("Created default superuser '{}'", meta::DEFAULT_SUPERUSER_NAME); - }).handle_exception([](auto exn) { - try { - std::rethrow_exception(exn); - } catch (const exceptions::request_execution_exception&) { - log.warn("Skipped default superuser setup: some nodes were not ready"); - } }).discard_result(); } @@ -253,10 +248,10 @@ future<> service::start() { } future<> service::stop() { - _delayed.cancel_all(); - + _as.request_abort(); return _permissions_cache->stop().then([this] { - return when_all_succeed(_role_manager->stop(), _authorizer->stop(), _authenticator->stop()); + auto s = _stopped.handle_exception_type([] (const sleep_aborted&) { }); + return when_all_succeed(std::move(s), _role_manager->stop(), _authorizer->stop(), _authenticator->stop()); }); } diff --git a/auth/service.hh b/auth/service.hh index f3b73698bb..4aa417325a 100644 --- a/auth/service.hh +++ b/auth/service.hh @@ -24,6 +24,7 @@ #include #include +#include #include #include @@ -33,7 +34,6 @@ #include "auth/permission.hh" #include "auth/permissions_cache.hh" #include "auth/role_manager.hh" -#include "delayed_tasks.hh" #include "seastarx.hh" #include "stdx.hh" @@ -86,7 +86,8 @@ class service final { // Only one of these should be registered, so we end up with some unused instances. Not the end of the world. std::unique_ptr<::service::migration_listener> _migration_listener; - delayed_tasks<> _delayed{}; + future<> _stopped; + seastar::abort_source _as; public: service( diff --git a/auth/standard_role_manager.cc b/auth/standard_role_manager.cc index da801f7c59..8975f780d5 100644 --- a/auth/standard_role_manager.cc +++ b/auth/standard_role_manager.cc @@ -28,6 +28,7 @@ #include #include #include +#include #include #include @@ -208,7 +209,7 @@ bool standard_role_manager::has_existing_roles() const { future<> standard_role_manager::start() { return once_among_shards([this] { return this->create_metadata_tables_if_missing().then([this] { - delay_until_system_ready(_delayed, [this] { + _stopped = auth::do_after_system_ready(_as, [this] { return seastar::async([this] { try { if (this->has_existing_roles()) { @@ -224,7 +225,7 @@ future<> standard_role_manager::start() { {meta::DEFAULT_SUPERUSER_NAME}).get(); log.info("Created default superuser role '{}'.", meta::DEFAULT_SUPERUSER_NAME); } catch (const exceptions::unavailable_exception& e) { - log.warn("Skipped default role setup: some nodes were ready; will retry"); + log.warn("Skipped default role setup: some nodes were not ready; will retry"); throw e; } }); @@ -233,6 +234,11 @@ future<> standard_role_manager::start() { }); } +future<> standard_role_manager::stop() { + _as.request_abort(); + return _stopped.handle_exception_type([] (const sleep_aborted&) { }); +} + future<> standard_role_manager::create(const authenticated_user& performer, stdx::string_view role_name, const role_config& c) { static const sstring query = sprint( diff --git a/auth/standard_role_manager.hh b/auth/standard_role_manager.hh index cb55e8d80c..b210f84609 100644 --- a/auth/standard_role_manager.hh +++ b/auth/standard_role_manager.hh @@ -26,10 +26,10 @@ #include #include +#include #include #include -#include "delayed_tasks.hh" #include "stdx.hh" #include "seastarx.hh" @@ -47,23 +47,22 @@ stdx::string_view standard_role_manager_name() noexcept; class standard_role_manager final : public role_manager { cql3::query_processor& _qp; - ::service::migration_manager& _migration_manager; - - delayed_tasks<> _delayed{}; + future<> _stopped; + seastar::abort_source _as; public: standard_role_manager(cql3::query_processor& qp, ::service::migration_manager& mm) - : _qp(qp), _migration_manager(mm) { + : _qp(qp) + , _migration_manager(mm) + , _stopped(make_ready_future<>()) { } virtual stdx::string_view qualified_java_name() const noexcept override; virtual future<> start() override; - virtual future<> stop() override { - return make_ready_future(); - } + virtual future<> stop() override; virtual future<> create(const authenticated_user& performer, stdx::string_view role_name, const role_config&) override; diff --git a/delayed_tasks.hh b/delayed_tasks.hh deleted file mode 100644 index fd0ff5b437..0000000000 --- a/delayed_tasks.hh +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Copyright (C) 2017 ScyllaDB - */ - -/* - * This file is part of Scylla. - * - * Scylla is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * Scylla is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with Scylla. If not, see . - */ - -#pragma once - -#include -#include -#include -#include -#include - -#include -#include - -#include "log.hh" -#include "seastarx.hh" - -// -// Delay asynchronous tasks. -// -template -class delayed_tasks final { - static logging::logger _logger; - - // A waiter is destroyed before the timer has elapsed. - class cancelled : public std::exception { - public: - }; - - class waiter final { - timer _timer; - - promise<> _done{}; - - public: - explicit waiter(typename Clock::duration d) : _timer([this] { _done.set_value(); }) { - _timer.arm(d); - } - - ~waiter() { - if (_timer.armed()) { - _timer.cancel(); - _done.set_exception(cancelled()); - } - } - - future<> get_future() noexcept { - return _done.get_future(); - } - }; - - // `std::list` because iterators are not invalidated. We assume that look-up time is not a bottleneck. - std::list> _waiters{}; - -public: - // - // Schedule the task `f` after d` has elapsed. If the instance goes out of scope before - // the duration has elapsed, then the task is cancelled. - // - template - void schedule_after(std::chrono::duration d, Task f) { - _logger.trace("Adding scheduled task."); - - auto iter = _waiters.insert(_waiters.end(), std::make_unique(d)); - auto& w = *iter; - - w->get_future().then([this, f = std::move(f)] () mutable { - _logger.trace("Running scheduled task."); - return f(); - }).then([this, iter] { - // We'll only get here if the instance is still alive, since otherwise the future will be resolved to - // `cancelled`. - _waiters.erase(iter); - }).template handle_exception_type([](const cancelled&) { - // Nothing. - return make_ready_future<>(); - }); - } - - // - // Cancel all scheduled tasks. - // - void cancel_all() { - _waiters.clear(); - } -}; - -template -logging::logger delayed_tasks::_logger("delayed_tasks");