diff --git a/alternator/controller.cc b/alternator/controller.cc
index 00bedfd734..a8b5020b8e 100644
--- a/alternator/controller.cc
+++ b/alternator/controller.cc
@@ -19,12 +19,30 @@
* along with Scylla. If not, see .
*/
+#include
#include "controller.hh"
+#include "server.hh"
+#include "executor.hh"
+#include "rmw_operation.hh"
+#include "db/config.hh"
+#include "cdc/generation_service.hh"
+#include "service/memory_limiter.hh"
using namespace seastar;
namespace alternator {
+template
+V get_or_default(const std::unordered_map& ss, const K2& key, const V2& def = V()) {
+ const auto iter = ss.find(key);
+ if (iter != ss.end()) {
+ return iter->second;
+ }
+ return def;
+}
+
+static logging::logger logger("alternator_controller");
+
controller::controller(sharded& proxy,
sharded& mm,
sharded& sys_dist_ks,
@@ -40,13 +58,85 @@ controller::controller(sharded& proxy,
, _memory_limiter(memory_limiter)
, _config(config)
{
- (void)_proxy;
- (void)_mm;
- (void)_sys_dist_ks;
- (void)_cdc_gen_svc;
- (void)_qp;
- (void)_memory_limiter;
- (void)_config;
+}
+
+future<> controller::start() {
+ return seastar::async([this] {
+ auto preferred = _config.listen_interface_prefer_ipv6() ? std::make_optional(net::inet_address::family::INET6) : std::nullopt;
+ auto family = _config.enable_ipv6_dns_lookup() || preferred ? std::nullopt : std::make_optional(net::inet_address::family::INET);
+
+ // Create an smp_service_group to be used for limiting the
+ // concurrency when forwarding Alternator request between
+ // shards - if necessary for LWT.
+ smp_service_group_config c;
+ c.max_nonlocal_requests = 5000;
+ _ssg = create_smp_service_group(c).get0();
+
+ rmw_operation::set_default_write_isolation(_config.alternator_write_isolation());
+ executor::set_default_timeout(std::chrono::milliseconds(_config.alternator_timeout_in_ms()));
+
+ net::inet_address addr;
+ try {
+ addr = net::dns::get_host_by_name(_config.alternator_address(), family).get0().addr_list.front();
+ } catch (...) {
+ std::throw_with_nested(std::runtime_error(fmt::format("Unable to resolve alternator_address {}", _config.alternator_address())));
+ }
+
+ auto get_cdc_metadata = [] (cdc::generation_service& svc) { return std::ref(svc.get_cdc_metadata()); };
+
+ _executor.start(std::ref(_proxy), std::ref(_mm), std::ref(_sys_dist_ks), sharded_parameter(get_cdc_metadata, std::ref(_cdc_gen_svc)), _ssg.value()).get();
+ _server.start(std::ref(_executor), std::ref(_qp)).get();
+ std::optional alternator_port;
+ if (_config.alternator_port()) {
+ alternator_port = _config.alternator_port();
+ }
+ std::optional alternator_https_port;
+ std::optional creds;
+ if (_config.alternator_https_port()) {
+ alternator_https_port = _config.alternator_https_port();
+ creds.emplace();
+ auto opts = _config.alternator_encryption_options();
+ if (opts.empty()) {
+ // Earlier versions mistakenly configured Alternator's
+ // HTTPS parameters via the "server_encryption_option"
+ // configuration parameter. We *temporarily* continue
+ // to allow this, for backward compatibility.
+ opts = _config.server_encryption_options();
+ if (!opts.empty()) {
+ logger.warn("Setting server_encryption_options to configure "
+ "Alternator's HTTPS encryption is deprecated. Please "
+ "switch to setting alternator_encryption_options instead.");
+ }
+ }
+ creds->set_dh_level(tls::dh_params::level::MEDIUM);
+ auto cert = get_or_default(opts, "certificate", db::config::get_conf_sub("scylla.crt").string());
+ auto key = get_or_default(opts, "keyfile", db::config::get_conf_sub("scylla.key").string());
+ creds->set_x509_key_file(cert, key, tls::x509_crt_format::PEM).get();
+ auto prio = get_or_default(opts, "priority_string", sstring());
+ creds->set_priority_string(db::config::default_tls_priority);
+ if (!prio.empty()) {
+ creds->set_priority_string(prio);
+ }
+ }
+ bool alternator_enforce_authorization = _config.alternator_enforce_authorization();
+ _server.invoke_on_all(
+ [this, addr, alternator_port, alternator_https_port, creds = std::move(creds), alternator_enforce_authorization] (server& server) mutable {
+ return server.init(addr, alternator_port, alternator_https_port, creds, alternator_enforce_authorization,
+ &_memory_limiter.local().get_semaphore(),
+ _config.max_concurrent_requests_per_shard);
+ }).then([addr, alternator_port, alternator_https_port] {
+ logger.info("Alternator server listening on {}, HTTP port {}, HTTPS port {}",
+ addr, alternator_port ? std::to_string(*alternator_port) : "OFF", alternator_https_port ? std::to_string(*alternator_https_port) : "OFF");
+ }).get();
+ });
+}
+
+future<> controller::stop() {
+ return seastar::async([this] {
+ _server.stop().get();
+ _executor.stop().get();
+ destroy_smp_service_group(_ssg.value()).get();
+ });
}
}
diff --git a/alternator/controller.hh b/alternator/controller.hh
index 3da2b22f1e..d60411f10c 100644
--- a/alternator/controller.hh
+++ b/alternator/controller.hh
@@ -22,6 +22,7 @@
#pragma once
#include
+#include
namespace service {
class storage_proxy;
@@ -46,6 +47,9 @@ namespace alternator {
using namespace seastar;
+class executor;
+class server;
+
class controller {
sharded& _proxy;
sharded& _mm;
@@ -55,6 +59,10 @@ class controller {
sharded& _memory_limiter;
const db::config& _config;
+ sharded _executor;
+ sharded _server;
+ std::optional _ssg;
+
public:
controller(sharded& proxy,
sharded& mm,
@@ -63,6 +71,9 @@ public:
sharded& qp,
sharded& memory_limiter,
const db::config& config);
+
+ future<> start();
+ future<> stop();
};
}
diff --git a/main.cc b/main.cc
index d825f1ffd8..32c5ed7552 100644
--- a/main.cc
+++ b/main.cc
@@ -80,13 +80,11 @@
#include "thrift/controller.hh"
#include "service/memory_limiter.hh"
-#include "alternator/server.hh"
#include "redis/service.hh"
#include "cdc/log.hh"
#include "cdc/cdc_extension.hh"
#include "cdc/generation_service.hh"
#include "alternator/tags_extension.hh"
-#include "alternator/rmw_operation.hh"
#include "db/paxos_grace_seconds_extension.hh"
#include "service/qos/standard_service_level_distributed_data_accessor.hh"
#include "service/storage_proxy.hh"
@@ -1376,81 +1374,15 @@ int main(int ac, char** av) {
});
alternator::controller alternator_ctl(proxy, mm, sys_dist_ks, cdc_generation_service, qp, service_memory_limiter, *cfg);
- (void)alternator_ctl;
if (cfg->alternator_port() || cfg->alternator_https_port()) {
- static sharded alternator_executor;
- static sharded alternator_server;
- // Create an smp_service_group to be used for limiting the
- // concurrency when forwarding Alternator request between
- // shards - if necessary for LWT.
- smp_service_group_config c;
- c.max_nonlocal_requests = 5000;
- smp_service_group ssg = create_smp_service_group(c).get0();
-
- with_scheduling_group(dbcfg.statement_scheduling_group, [&] () mutable {
-
- alternator::rmw_operation::set_default_write_isolation(cfg->alternator_write_isolation());
- alternator::executor::set_default_timeout(std::chrono::milliseconds(cfg->alternator_timeout_in_ms()));
-
- net::inet_address addr;
- try {
- addr = net::dns::get_host_by_name(cfg->alternator_address(), family).get0().addr_list.front();
- } catch (...) {
- std::throw_with_nested(std::runtime_error(fmt::format("Unable to resolve alternator_address {}", cfg->alternator_address())));
- }
- alternator_executor.start(std::ref(proxy), std::ref(mm), std::ref(sys_dist_ks), sharded_parameter(get_cdc_metadata, std::ref(cdc_generation_service)), ssg).get();
- alternator_server.start(std::ref(alternator_executor), std::ref(qp)).get();
- std::optional alternator_port;
- if (cfg->alternator_port()) {
- alternator_port = cfg->alternator_port();
- }
- std::optional alternator_https_port;
- std::optional creds;
- if (cfg->alternator_https_port()) {
- alternator_https_port = cfg->alternator_https_port();
- creds.emplace();
- auto opts = cfg->alternator_encryption_options();
- if (opts.empty()) {
- // Earlier versions mistakenly configured Alternator's
- // HTTPS parameters via the "server_encryption_option"
- // configuration parameter. We *temporarily* continue
- // to allow this, for backward compatibility.
- opts = cfg->server_encryption_options();
- if (!opts.empty()) {
- startlog.warn("Setting server_encryption_options to configure "
- "Alternator's HTTPS encryption is deprecated. Please "
- "switch to setting alternator_encryption_options instead.");
- }
- }
- creds->set_dh_level(tls::dh_params::level::MEDIUM);
- auto cert = get_or_default(opts, "certificate", db::config::get_conf_sub("scylla.crt").string());
- auto key = get_or_default(opts, "keyfile", db::config::get_conf_sub("scylla.key").string());
- creds->set_x509_key_file(cert, key, tls::x509_crt_format::PEM).get();
- auto prio = get_or_default(opts, "priority_string", sstring());
- creds->set_priority_string(db::config::default_tls_priority);
- if (!prio.empty()) {
- creds->set_priority_string(prio);
- }
- }
- bool alternator_enforce_authorization = cfg->alternator_enforce_authorization();
- return alternator_server.invoke_on_all(
- [addr, alternator_port, alternator_https_port, creds = std::move(creds), alternator_enforce_authorization, cfg, &service_memory_limiter] (alternator::server& server) mutable {
- return server.init(addr, alternator_port, alternator_https_port, creds, alternator_enforce_authorization,
- &service_memory_limiter.local().get_semaphore(),
- cfg->max_concurrent_requests_per_shard);
- }).then([addr, alternator_port, alternator_https_port] {
- startlog.info("Alternator server listening on {}, HTTP port {}, HTTPS port {}",
- addr, alternator_port ? std::to_string(*alternator_port) : "OFF", alternator_https_port ? std::to_string(*alternator_https_port) : "OFF");
- });
+ with_scheduling_group(dbcfg.statement_scheduling_group, [&alternator_ctl] () mutable {
+ return alternator_ctl.start();
}).get();
- auto stop_alternator = [ssg] {
- alternator_server.stop().get();
- alternator_executor.stop().get();
- destroy_smp_service_group(ssg).get();
- };
- ss.register_client_shutdown_hook("alternator", std::move(stop_alternator));
+ ss.register_client_shutdown_hook("alternator", [&alternator_ctl] {
+ alternator_ctl.stop().get();
+ });
}
static redis_service redis;