From fbd98e6292dcd648c8daf6d814a13e12e1e75166 Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Fri, 11 Jun 2021 17:49:24 +0300 Subject: [PATCH] alternator: Move start-stop code into controller This move is not "just move", but also includes: - putting the whole thing into seastar::async() - switch from locally captured dependencies into controller's class members - making smp_service_groups optional because it doesn't have default contructor and should somehow survive on constructed controller until its start() Also copy few bits from main that can be generalized later: - get_or_default() helper from main - sharded_parameter lambda for cdc - net family and preferred thing from main ( this also fixed the indentation broken by previous patch ) Signed-off-by: Pavel Emelyanov --- alternator/controller.cc | 104 ++++++++++++++++++++++++++++++++++++--- alternator/controller.hh | 11 +++++ main.cc | 78 ++--------------------------- 3 files changed, 113 insertions(+), 80 deletions(-) diff --git a/alternator/controller.cc b/alternator/controller.cc index 00bedfd734..a8b5020b8e 100644 --- a/alternator/controller.cc +++ b/alternator/controller.cc @@ -19,12 +19,30 @@ * along with Scylla. If not, see . */ +#include #include "controller.hh" +#include "server.hh" +#include "executor.hh" +#include "rmw_operation.hh" +#include "db/config.hh" +#include "cdc/generation_service.hh" +#include "service/memory_limiter.hh" using namespace seastar; namespace alternator { +template +V get_or_default(const std::unordered_map& ss, const K2& key, const V2& def = V()) { + const auto iter = ss.find(key); + if (iter != ss.end()) { + return iter->second; + } + return def; +} + +static logging::logger logger("alternator_controller"); + controller::controller(sharded& proxy, sharded& mm, sharded& sys_dist_ks, @@ -40,13 +58,85 @@ controller::controller(sharded& proxy, , _memory_limiter(memory_limiter) , _config(config) { - (void)_proxy; - (void)_mm; - (void)_sys_dist_ks; - (void)_cdc_gen_svc; - (void)_qp; - (void)_memory_limiter; - (void)_config; +} + +future<> controller::start() { + return seastar::async([this] { + auto preferred = _config.listen_interface_prefer_ipv6() ? std::make_optional(net::inet_address::family::INET6) : std::nullopt; + auto family = _config.enable_ipv6_dns_lookup() || preferred ? std::nullopt : std::make_optional(net::inet_address::family::INET); + + // Create an smp_service_group to be used for limiting the + // concurrency when forwarding Alternator request between + // shards - if necessary for LWT. + smp_service_group_config c; + c.max_nonlocal_requests = 5000; + _ssg = create_smp_service_group(c).get0(); + + rmw_operation::set_default_write_isolation(_config.alternator_write_isolation()); + executor::set_default_timeout(std::chrono::milliseconds(_config.alternator_timeout_in_ms())); + + net::inet_address addr; + try { + addr = net::dns::get_host_by_name(_config.alternator_address(), family).get0().addr_list.front(); + } catch (...) { + std::throw_with_nested(std::runtime_error(fmt::format("Unable to resolve alternator_address {}", _config.alternator_address()))); + } + + auto get_cdc_metadata = [] (cdc::generation_service& svc) { return std::ref(svc.get_cdc_metadata()); }; + + _executor.start(std::ref(_proxy), std::ref(_mm), std::ref(_sys_dist_ks), sharded_parameter(get_cdc_metadata, std::ref(_cdc_gen_svc)), _ssg.value()).get(); + _server.start(std::ref(_executor), std::ref(_qp)).get(); + std::optional alternator_port; + if (_config.alternator_port()) { + alternator_port = _config.alternator_port(); + } + std::optional alternator_https_port; + std::optional creds; + if (_config.alternator_https_port()) { + alternator_https_port = _config.alternator_https_port(); + creds.emplace(); + auto opts = _config.alternator_encryption_options(); + if (opts.empty()) { + // Earlier versions mistakenly configured Alternator's + // HTTPS parameters via the "server_encryption_option" + // configuration parameter. We *temporarily* continue + // to allow this, for backward compatibility. + opts = _config.server_encryption_options(); + if (!opts.empty()) { + logger.warn("Setting server_encryption_options to configure " + "Alternator's HTTPS encryption is deprecated. Please " + "switch to setting alternator_encryption_options instead."); + } + } + creds->set_dh_level(tls::dh_params::level::MEDIUM); + auto cert = get_or_default(opts, "certificate", db::config::get_conf_sub("scylla.crt").string()); + auto key = get_or_default(opts, "keyfile", db::config::get_conf_sub("scylla.key").string()); + creds->set_x509_key_file(cert, key, tls::x509_crt_format::PEM).get(); + auto prio = get_or_default(opts, "priority_string", sstring()); + creds->set_priority_string(db::config::default_tls_priority); + if (!prio.empty()) { + creds->set_priority_string(prio); + } + } + bool alternator_enforce_authorization = _config.alternator_enforce_authorization(); + _server.invoke_on_all( + [this, addr, alternator_port, alternator_https_port, creds = std::move(creds), alternator_enforce_authorization] (server& server) mutable { + return server.init(addr, alternator_port, alternator_https_port, creds, alternator_enforce_authorization, + &_memory_limiter.local().get_semaphore(), + _config.max_concurrent_requests_per_shard); + }).then([addr, alternator_port, alternator_https_port] { + logger.info("Alternator server listening on {}, HTTP port {}, HTTPS port {}", + addr, alternator_port ? std::to_string(*alternator_port) : "OFF", alternator_https_port ? std::to_string(*alternator_https_port) : "OFF"); + }).get(); + }); +} + +future<> controller::stop() { + return seastar::async([this] { + _server.stop().get(); + _executor.stop().get(); + destroy_smp_service_group(_ssg.value()).get(); + }); } } diff --git a/alternator/controller.hh b/alternator/controller.hh index 3da2b22f1e..d60411f10c 100644 --- a/alternator/controller.hh +++ b/alternator/controller.hh @@ -22,6 +22,7 @@ #pragma once #include +#include namespace service { class storage_proxy; @@ -46,6 +47,9 @@ namespace alternator { using namespace seastar; +class executor; +class server; + class controller { sharded& _proxy; sharded& _mm; @@ -55,6 +59,10 @@ class controller { sharded& _memory_limiter; const db::config& _config; + sharded _executor; + sharded _server; + std::optional _ssg; + public: controller(sharded& proxy, sharded& mm, @@ -63,6 +71,9 @@ public: sharded& qp, sharded& memory_limiter, const db::config& config); + + future<> start(); + future<> stop(); }; } diff --git a/main.cc b/main.cc index d825f1ffd8..32c5ed7552 100644 --- a/main.cc +++ b/main.cc @@ -80,13 +80,11 @@ #include "thrift/controller.hh" #include "service/memory_limiter.hh" -#include "alternator/server.hh" #include "redis/service.hh" #include "cdc/log.hh" #include "cdc/cdc_extension.hh" #include "cdc/generation_service.hh" #include "alternator/tags_extension.hh" -#include "alternator/rmw_operation.hh" #include "db/paxos_grace_seconds_extension.hh" #include "service/qos/standard_service_level_distributed_data_accessor.hh" #include "service/storage_proxy.hh" @@ -1376,81 +1374,15 @@ int main(int ac, char** av) { }); alternator::controller alternator_ctl(proxy, mm, sys_dist_ks, cdc_generation_service, qp, service_memory_limiter, *cfg); - (void)alternator_ctl; if (cfg->alternator_port() || cfg->alternator_https_port()) { - static sharded alternator_executor; - static sharded alternator_server; - // Create an smp_service_group to be used for limiting the - // concurrency when forwarding Alternator request between - // shards - if necessary for LWT. - smp_service_group_config c; - c.max_nonlocal_requests = 5000; - smp_service_group ssg = create_smp_service_group(c).get0(); - - with_scheduling_group(dbcfg.statement_scheduling_group, [&] () mutable { - - alternator::rmw_operation::set_default_write_isolation(cfg->alternator_write_isolation()); - alternator::executor::set_default_timeout(std::chrono::milliseconds(cfg->alternator_timeout_in_ms())); - - net::inet_address addr; - try { - addr = net::dns::get_host_by_name(cfg->alternator_address(), family).get0().addr_list.front(); - } catch (...) { - std::throw_with_nested(std::runtime_error(fmt::format("Unable to resolve alternator_address {}", cfg->alternator_address()))); - } - alternator_executor.start(std::ref(proxy), std::ref(mm), std::ref(sys_dist_ks), sharded_parameter(get_cdc_metadata, std::ref(cdc_generation_service)), ssg).get(); - alternator_server.start(std::ref(alternator_executor), std::ref(qp)).get(); - std::optional alternator_port; - if (cfg->alternator_port()) { - alternator_port = cfg->alternator_port(); - } - std::optional alternator_https_port; - std::optional creds; - if (cfg->alternator_https_port()) { - alternator_https_port = cfg->alternator_https_port(); - creds.emplace(); - auto opts = cfg->alternator_encryption_options(); - if (opts.empty()) { - // Earlier versions mistakenly configured Alternator's - // HTTPS parameters via the "server_encryption_option" - // configuration parameter. We *temporarily* continue - // to allow this, for backward compatibility. - opts = cfg->server_encryption_options(); - if (!opts.empty()) { - startlog.warn("Setting server_encryption_options to configure " - "Alternator's HTTPS encryption is deprecated. Please " - "switch to setting alternator_encryption_options instead."); - } - } - creds->set_dh_level(tls::dh_params::level::MEDIUM); - auto cert = get_or_default(opts, "certificate", db::config::get_conf_sub("scylla.crt").string()); - auto key = get_or_default(opts, "keyfile", db::config::get_conf_sub("scylla.key").string()); - creds->set_x509_key_file(cert, key, tls::x509_crt_format::PEM).get(); - auto prio = get_or_default(opts, "priority_string", sstring()); - creds->set_priority_string(db::config::default_tls_priority); - if (!prio.empty()) { - creds->set_priority_string(prio); - } - } - bool alternator_enforce_authorization = cfg->alternator_enforce_authorization(); - return alternator_server.invoke_on_all( - [addr, alternator_port, alternator_https_port, creds = std::move(creds), alternator_enforce_authorization, cfg, &service_memory_limiter] (alternator::server& server) mutable { - return server.init(addr, alternator_port, alternator_https_port, creds, alternator_enforce_authorization, - &service_memory_limiter.local().get_semaphore(), - cfg->max_concurrent_requests_per_shard); - }).then([addr, alternator_port, alternator_https_port] { - startlog.info("Alternator server listening on {}, HTTP port {}, HTTPS port {}", - addr, alternator_port ? std::to_string(*alternator_port) : "OFF", alternator_https_port ? std::to_string(*alternator_https_port) : "OFF"); - }); + with_scheduling_group(dbcfg.statement_scheduling_group, [&alternator_ctl] () mutable { + return alternator_ctl.start(); }).get(); - auto stop_alternator = [ssg] { - alternator_server.stop().get(); - alternator_executor.stop().get(); - destroy_smp_service_group(ssg).get(); - }; - ss.register_client_shutdown_hook("alternator", std::move(stop_alternator)); + ss.register_client_shutdown_hook("alternator", [&alternator_ctl] { + alternator_ctl.stop().get(); + }); } static redis_service redis;