Files
scylladb/alternator/controller.cc
Vlad Zolotarov 3a54bab193 controller: update get_client_data to use foreign_ptr for client_data
get_client_data() is used to assemble `client_data` objects from each connection
on each CPU in the context of generation of the `system.clients` virtual table data.

After collected, `client_data` objects were std::moved and arranged into a
different structure to match the table's sorting requirements.

This didn't allow having not-cross-shard-movable objects as fields in the `client_data`,
e.g. lw_shared_ptr objects.

Since we are planning to add such fields to `client_data` in following patches this patch
is solving the limitation above by making get_client_data() return `foreign_ptr<std::unique_ptr<client_data>>`
objects instead of naked `client_data` ones.

Signed-off-by: Vlad Zolotarov <vladz@scylladb.com>
2025-12-19 11:01:41 -05:00

177 lines
7.2 KiB
C++

/*
* Copyright (C) 2021-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include <seastar/core/with_scheduling_group.hh>
#include <seastar/net/dns.hh>
#include "controller.hh"
#include "server.hh"
#include "executor.hh"
#include "rmw_operation.hh"
#include "db/config.hh"
#include "cdc/generation_service.hh"
#include "service/memory_limiter.hh"
#include "auth/service.hh"
#include "service/qos/service_level_controller.hh"
using namespace seastar;
namespace alternator {
static logging::logger logger("alternator_controller");
controller::controller(
sharded<gms::gossiper>& gossiper,
sharded<service::storage_proxy>& proxy,
sharded<service::migration_manager>& mm,
sharded<db::system_distributed_keyspace>& sys_dist_ks,
sharded<cdc::generation_service>& cdc_gen_svc,
sharded<service::memory_limiter>& memory_limiter,
sharded<auth::service>& auth_service,
sharded<qos::service_level_controller>& sl_controller,
const db::config& config,
seastar::scheduling_group sg)
: protocol_server(sg)
, _gossiper(gossiper)
, _proxy(proxy)
, _mm(mm)
, _sys_dist_ks(sys_dist_ks)
, _cdc_gen_svc(cdc_gen_svc)
, _memory_limiter(memory_limiter)
, _auth_service(auth_service)
, _sl_controller(sl_controller)
, _config(config)
{
}
sstring controller::name() const {
return "alternator";
}
sstring controller::protocol() const {
return "dynamodb";
}
sstring controller::protocol_version() const {
return version;
}
std::vector<socket_address> controller::listen_addresses() const {
return _listen_addresses;
}
future<> controller::start_server() {
seastar::thread_attributes attr;
attr.sched_group = _sched_group;
return seastar::async(std::move(attr), [this] {
_listen_addresses.clear();
auto preferred = _config.listen_interface_prefer_ipv6() ? std::make_optional(net::inet_address::family::INET6) : std::nullopt;
auto family = _config.enable_ipv6_dns_lookup() || preferred ? std::nullopt : std::make_optional(net::inet_address::family::INET);
// Create an smp_service_group to be used for limiting the
// concurrency when forwarding Alternator request between
// shards - if necessary for LWT.
smp_service_group_config c;
c.max_nonlocal_requests = 5000;
_ssg = create_smp_service_group(c).get();
rmw_operation::set_default_write_isolation(_config.alternator_write_isolation());
net::inet_address addr = utils::resolve(_config.alternator_address, family).get();
auto get_cdc_metadata = [] (cdc::generation_service& svc) { return std::ref(svc.get_cdc_metadata()); };
auto get_timeout_in_ms = [] (const db::config& cfg) -> utils::updateable_value<uint32_t> {
return cfg.alternator_timeout_in_ms;
};
_executor.start(std::ref(_gossiper), std::ref(_proxy), std::ref(_mm), std::ref(_sys_dist_ks),
sharded_parameter(get_cdc_metadata, std::ref(_cdc_gen_svc)), _ssg.value(),
sharded_parameter(get_timeout_in_ms, std::ref(_config))).get();
_server.start(std::ref(_executor), std::ref(_proxy), std::ref(_gossiper), std::ref(_auth_service), std::ref(_sl_controller)).get();
// Note: from this point on, if start_server() throws for any reason,
// it must first call stop_server() to stop the executor and server
// services we just started - or Scylla will cause an assertion
// failure when the controller object is destroyed in the exception
// unwinding.
std::optional<uint16_t> alternator_port;
if (_config.alternator_port()) {
alternator_port = _config.alternator_port();
_listen_addresses.push_back({addr, *alternator_port});
}
std::optional<uint16_t> alternator_https_port;
std::optional<tls::credentials_builder> creds;
if (_config.alternator_https_port()) {
alternator_https_port = _config.alternator_https_port();
_listen_addresses.push_back({addr, *alternator_https_port});
creds.emplace();
auto opts = _config.alternator_encryption_options();
if (opts.empty()) {
// Earlier versions mistakenly configured Alternator's
// HTTPS parameters via the "server_encryption_option"
// configuration parameter. We *temporarily* continue
// to allow this, for backward compatibility.
opts = _config.server_encryption_options();
if (!opts.empty()) {
logger.warn("Setting server_encryption_options to configure "
"Alternator's HTTPS encryption is deprecated. Please "
"switch to setting alternator_encryption_options instead.");
}
}
opts.erase("require_client_auth");
opts.erase("truststore");
try {
utils::configure_tls_creds_builder(creds.value(), std::move(opts)).get();
} catch(...) {
logger.error("Failed to set up Alternator TLS credentials: {}", std::current_exception());
stop_server().get();
std::throw_with_nested(std::runtime_error("Failed to set up Alternator TLS credentials"));
}
}
_server.invoke_on_all(
[this, addr, alternator_port, alternator_https_port, creds = std::move(creds)] (server& server) mutable {
return server.init(addr, alternator_port, alternator_https_port, creds,
_config.alternator_enforce_authorization,
_config.alternator_warn_authorization,
_config.alternator_max_users_query_size_in_trace_output,
&_memory_limiter.local().get_semaphore(),
_config.max_concurrent_requests_per_shard);
}).handle_exception([this, addr, alternator_port, alternator_https_port] (std::exception_ptr ep) {
logger.error("Failed to set up Alternator HTTP server on {} port {}, TLS port {}: {}",
addr, alternator_port ? std::to_string(*alternator_port) : "OFF", alternator_https_port ? std::to_string(*alternator_https_port) : "OFF", ep);
return stop_server().then([ep = std::move(ep)] { return make_exception_future<>(ep); });
}).then([addr, alternator_port, alternator_https_port] {
logger.info("Alternator server listening on {}, HTTP port {}, HTTPS port {}",
addr, alternator_port ? std::to_string(*alternator_port) : "OFF", alternator_https_port ? std::to_string(*alternator_https_port) : "OFF");
}).get();
});
}
future<> controller::stop_server() {
return seastar::async([this] {
if (!_ssg) {
return;
}
_server.stop().get();
_executor.stop().get();
_listen_addresses.clear();
destroy_smp_service_group(_ssg.value()).get();
});
}
future<> controller::request_stop_server() {
return with_scheduling_group(_sched_group, [this] {
return stop_server();
});
}
future<utils::chunked_vector<foreign_ptr<std::unique_ptr<client_data>>>> controller::get_client_data() {
return _server.local().get_client_data();
}
}