alternator: Move start-stop code into controller

This move is not "just move", but also includes:

- putting the whole thing into seastar::async()
- switch from locally captured dependencies into controller's
  class members
- making smp_service_groups optional because it doesn't have
  default contructor and should somehow survive on constructed
  controller until its start()

Also copy few bits from main that can be generalized later:

- get_or_default() helper from main
- sharded_parameter lambda for cdc
- net family and preferred thing from main

( this also fixed the indentation broken by previous patch )

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
This commit is contained in:
Pavel Emelyanov
2021-06-11 17:49:24 +03:00
parent 9e2ad77436
commit fbd98e6292
3 changed files with 113 additions and 80 deletions

View File

@@ -19,12 +19,30 @@
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#include <seastar/net/dns.hh>
#include "controller.hh"
#include "server.hh"
#include "executor.hh"
#include "rmw_operation.hh"
#include "db/config.hh"
#include "cdc/generation_service.hh"
#include "service/memory_limiter.hh"
using namespace seastar;
namespace alternator {
template<typename K, typename V, typename... Args, typename K2, typename V2 = V>
V get_or_default(const std::unordered_map<K, V, Args...>& ss, const K2& key, const V2& def = V()) {
const auto iter = ss.find(key);
if (iter != ss.end()) {
return iter->second;
}
return def;
}
static logging::logger logger("alternator_controller");
controller::controller(sharded<service::storage_proxy>& proxy,
sharded<service::migration_manager>& mm,
sharded<db::system_distributed_keyspace>& sys_dist_ks,
@@ -40,13 +58,85 @@ controller::controller(sharded<service::storage_proxy>& proxy,
, _memory_limiter(memory_limiter)
, _config(config)
{
(void)_proxy;
(void)_mm;
(void)_sys_dist_ks;
(void)_cdc_gen_svc;
(void)_qp;
(void)_memory_limiter;
(void)_config;
}
future<> controller::start() {
return seastar::async([this] {
auto preferred = _config.listen_interface_prefer_ipv6() ? std::make_optional(net::inet_address::family::INET6) : std::nullopt;
auto family = _config.enable_ipv6_dns_lookup() || preferred ? std::nullopt : std::make_optional(net::inet_address::family::INET);
// Create an smp_service_group to be used for limiting the
// concurrency when forwarding Alternator request between
// shards - if necessary for LWT.
smp_service_group_config c;
c.max_nonlocal_requests = 5000;
_ssg = create_smp_service_group(c).get0();
rmw_operation::set_default_write_isolation(_config.alternator_write_isolation());
executor::set_default_timeout(std::chrono::milliseconds(_config.alternator_timeout_in_ms()));
net::inet_address addr;
try {
addr = net::dns::get_host_by_name(_config.alternator_address(), family).get0().addr_list.front();
} catch (...) {
std::throw_with_nested(std::runtime_error(fmt::format("Unable to resolve alternator_address {}", _config.alternator_address())));
}
auto get_cdc_metadata = [] (cdc::generation_service& svc) { return std::ref(svc.get_cdc_metadata()); };
_executor.start(std::ref(_proxy), std::ref(_mm), std::ref(_sys_dist_ks), sharded_parameter(get_cdc_metadata, std::ref(_cdc_gen_svc)), _ssg.value()).get();
_server.start(std::ref(_executor), std::ref(_qp)).get();
std::optional<uint16_t> alternator_port;
if (_config.alternator_port()) {
alternator_port = _config.alternator_port();
}
std::optional<uint16_t> alternator_https_port;
std::optional<tls::credentials_builder> creds;
if (_config.alternator_https_port()) {
alternator_https_port = _config.alternator_https_port();
creds.emplace();
auto opts = _config.alternator_encryption_options();
if (opts.empty()) {
// Earlier versions mistakenly configured Alternator's
// HTTPS parameters via the "server_encryption_option"
// configuration parameter. We *temporarily* continue
// to allow this, for backward compatibility.
opts = _config.server_encryption_options();
if (!opts.empty()) {
logger.warn("Setting server_encryption_options to configure "
"Alternator's HTTPS encryption is deprecated. Please "
"switch to setting alternator_encryption_options instead.");
}
}
creds->set_dh_level(tls::dh_params::level::MEDIUM);
auto cert = get_or_default(opts, "certificate", db::config::get_conf_sub("scylla.crt").string());
auto key = get_or_default(opts, "keyfile", db::config::get_conf_sub("scylla.key").string());
creds->set_x509_key_file(cert, key, tls::x509_crt_format::PEM).get();
auto prio = get_or_default(opts, "priority_string", sstring());
creds->set_priority_string(db::config::default_tls_priority);
if (!prio.empty()) {
creds->set_priority_string(prio);
}
}
bool alternator_enforce_authorization = _config.alternator_enforce_authorization();
_server.invoke_on_all(
[this, addr, alternator_port, alternator_https_port, creds = std::move(creds), alternator_enforce_authorization] (server& server) mutable {
return server.init(addr, alternator_port, alternator_https_port, creds, alternator_enforce_authorization,
&_memory_limiter.local().get_semaphore(),
_config.max_concurrent_requests_per_shard);
}).then([addr, alternator_port, alternator_https_port] {
logger.info("Alternator server listening on {}, HTTP port {}, HTTPS port {}",
addr, alternator_port ? std::to_string(*alternator_port) : "OFF", alternator_https_port ? std::to_string(*alternator_https_port) : "OFF");
}).get();
});
}
future<> controller::stop() {
return seastar::async([this] {
_server.stop().get();
_executor.stop().get();
destroy_smp_service_group(_ssg.value()).get();
});
}
}

View File

@@ -22,6 +22,7 @@
#pragma once
#include <seastar/core/sharded.hh>
#include <seastar/core/smp.hh>
namespace service {
class storage_proxy;
@@ -46,6 +47,9 @@ namespace alternator {
using namespace seastar;
class executor;
class server;
class controller {
sharded<service::storage_proxy>& _proxy;
sharded<service::migration_manager>& _mm;
@@ -55,6 +59,10 @@ class controller {
sharded<service::memory_limiter>& _memory_limiter;
const db::config& _config;
sharded<executor> _executor;
sharded<server> _server;
std::optional<smp_service_group> _ssg;
public:
controller(sharded<service::storage_proxy>& proxy,
sharded<service::migration_manager>& mm,
@@ -63,6 +71,9 @@ public:
sharded<cql3::query_processor>& qp,
sharded<service::memory_limiter>& memory_limiter,
const db::config& config);
future<> start();
future<> stop();
};
}

78
main.cc
View File

@@ -80,13 +80,11 @@
#include "thrift/controller.hh"
#include "service/memory_limiter.hh"
#include "alternator/server.hh"
#include "redis/service.hh"
#include "cdc/log.hh"
#include "cdc/cdc_extension.hh"
#include "cdc/generation_service.hh"
#include "alternator/tags_extension.hh"
#include "alternator/rmw_operation.hh"
#include "db/paxos_grace_seconds_extension.hh"
#include "service/qos/standard_service_level_distributed_data_accessor.hh"
#include "service/storage_proxy.hh"
@@ -1376,81 +1374,15 @@ int main(int ac, char** av) {
});
alternator::controller alternator_ctl(proxy, mm, sys_dist_ks, cdc_generation_service, qp, service_memory_limiter, *cfg);
(void)alternator_ctl;
if (cfg->alternator_port() || cfg->alternator_https_port()) {
static sharded<alternator::executor> alternator_executor;
static sharded<alternator::server> alternator_server;
// Create an smp_service_group to be used for limiting the
// concurrency when forwarding Alternator request between
// shards - if necessary for LWT.
smp_service_group_config c;
c.max_nonlocal_requests = 5000;
smp_service_group ssg = create_smp_service_group(c).get0();
with_scheduling_group(dbcfg.statement_scheduling_group, [&] () mutable {
alternator::rmw_operation::set_default_write_isolation(cfg->alternator_write_isolation());
alternator::executor::set_default_timeout(std::chrono::milliseconds(cfg->alternator_timeout_in_ms()));
net::inet_address addr;
try {
addr = net::dns::get_host_by_name(cfg->alternator_address(), family).get0().addr_list.front();
} catch (...) {
std::throw_with_nested(std::runtime_error(fmt::format("Unable to resolve alternator_address {}", cfg->alternator_address())));
}
alternator_executor.start(std::ref(proxy), std::ref(mm), std::ref(sys_dist_ks), sharded_parameter(get_cdc_metadata, std::ref(cdc_generation_service)), ssg).get();
alternator_server.start(std::ref(alternator_executor), std::ref(qp)).get();
std::optional<uint16_t> alternator_port;
if (cfg->alternator_port()) {
alternator_port = cfg->alternator_port();
}
std::optional<uint16_t> alternator_https_port;
std::optional<tls::credentials_builder> creds;
if (cfg->alternator_https_port()) {
alternator_https_port = cfg->alternator_https_port();
creds.emplace();
auto opts = cfg->alternator_encryption_options();
if (opts.empty()) {
// Earlier versions mistakenly configured Alternator's
// HTTPS parameters via the "server_encryption_option"
// configuration parameter. We *temporarily* continue
// to allow this, for backward compatibility.
opts = cfg->server_encryption_options();
if (!opts.empty()) {
startlog.warn("Setting server_encryption_options to configure "
"Alternator's HTTPS encryption is deprecated. Please "
"switch to setting alternator_encryption_options instead.");
}
}
creds->set_dh_level(tls::dh_params::level::MEDIUM);
auto cert = get_or_default(opts, "certificate", db::config::get_conf_sub("scylla.crt").string());
auto key = get_or_default(opts, "keyfile", db::config::get_conf_sub("scylla.key").string());
creds->set_x509_key_file(cert, key, tls::x509_crt_format::PEM).get();
auto prio = get_or_default(opts, "priority_string", sstring());
creds->set_priority_string(db::config::default_tls_priority);
if (!prio.empty()) {
creds->set_priority_string(prio);
}
}
bool alternator_enforce_authorization = cfg->alternator_enforce_authorization();
return alternator_server.invoke_on_all(
[addr, alternator_port, alternator_https_port, creds = std::move(creds), alternator_enforce_authorization, cfg, &service_memory_limiter] (alternator::server& server) mutable {
return server.init(addr, alternator_port, alternator_https_port, creds, alternator_enforce_authorization,
&service_memory_limiter.local().get_semaphore(),
cfg->max_concurrent_requests_per_shard);
}).then([addr, alternator_port, alternator_https_port] {
startlog.info("Alternator server listening on {}, HTTP port {}, HTTPS port {}",
addr, alternator_port ? std::to_string(*alternator_port) : "OFF", alternator_https_port ? std::to_string(*alternator_https_port) : "OFF");
});
with_scheduling_group(dbcfg.statement_scheduling_group, [&alternator_ctl] () mutable {
return alternator_ctl.start();
}).get();
auto stop_alternator = [ssg] {
alternator_server.stop().get();
alternator_executor.stop().get();
destroy_smp_service_group(ssg).get();
};
ss.register_client_shutdown_hook("alternator", std::move(stop_alternator));
ss.register_client_shutdown_hook("alternator", [&alternator_ctl] {
alternator_ctl.stop().get();
});
}
static redis_service redis;