Files
scylladb/thrift/controller.cc
Kefu Chai e0ac2eb770 redis,thrift,transport: pass config via sharded_parameter
* pass config via sharded_parameter
* initialize config using designated initializer

this change paves the road to servers with live-updateable timeout
options.

before this change, the servers initialize a domain specific combo
config, like `redis_server_config`,  with the same instance of a
timeout_config, and pass the combox config as a ctor parameter to
construct each sharded service instance. but this design assumes
the value semantic of the config class, say, it should be copyable.
but if we want to use utils::updateable_value<> to get updated
option values, we would have to postpone the instantiation of the
config until the sharded service is about to be initialized.

so, in this change, instead of taking a domain specific config created
before hand, all services constructed with a `timeout_config` will
take a `sharded_parameter()` for creating the config. also, take
this opportunity to initialize the config using designated initializer.
for two reasons:

* less repeatings this way. we don't have to repeat the variable
  name of the config being initialized for each member variable.
* prepare for some member variables which do not have a default
  constructor. this applies to the timeout_config's updater which
  will not have a default constructor, as it should be initialized
  by db::config and a reference to the timeout_config to be updated.

we will update the `timeout_config` side in a follow-up commit.

Signed-off-by: Kefu Chai <kefu.chai@scylladb.com>
2023-03-29 20:06:00 +08:00

125 lines
3.8 KiB
C++

/*
* Copyright (C) 2020-present ScyllaDB
*/
/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#include "thrift/controller.hh"
#include "seastar/core/sharded.hh"
#include "thrift/server.hh"
#include "replica/database.hh"
#include "db/config.hh"
#include "log.hh"
static logging::logger clogger("thrift_controller");
thrift_controller::thrift_controller(distributed<replica::database>& db, sharded<auth::service>& auth,
sharded<cql3::query_processor>& qp, sharded<service::memory_limiter>& ml,
sharded<service::storage_service>& ss, sharded<service::storage_proxy>& proxy)
: _ops_sem(1)
, _db(db)
, _auth_service(auth)
, _qp(qp)
, _mem_limiter(ml)
, _ss(ss)
, _proxy(proxy)
{ }
sstring thrift_controller::name() const {
return "rpc";
}
sstring thrift_controller::protocol() const {
return "thrift";
}
sstring thrift_controller::protocol_version() const {
return ::cassandra::thrift_version;
}
std::vector<socket_address> thrift_controller::listen_addresses() const {
if (_server && _addr) {
return {*_addr};
}
return {};
}
future<> thrift_controller::start_server() {
if (!_ops_sem.try_wait()) {
throw std::runtime_error(format("Thrift server is stopping, try again later"));
}
return do_start_server().finally([this] { _ops_sem.signal(); });
}
future<> thrift_controller::do_start_server() {
if (_server) {
return make_ready_future<>();
}
return seastar::async([this] {
auto tserver = std::make_unique<distributed<thrift_server>>();
_addr.reset();
auto& cfg = _db.local().get_config();
auto preferred = cfg.rpc_interface_prefer_ipv6() ? std::make_optional(net::inet_address::family::INET6) : std::nullopt;
auto family = cfg.enable_ipv6_dns_lookup() || preferred ? std::nullopt : std::make_optional(net::inet_address::family::INET);
auto keepalive = cfg.rpc_keepalive();
<<<<<<< HEAD
auto ip = utils::resolve(cfg.rpc_address, family, preferred).get();
auto port = cfg.rpc_port();
_addr.emplace(ip, port);
auto tsc = sharded_parameter([&cfg] {
return thrift_server_config {
.timeout_config = make_timeout_config(cfg),
.max_request_size = cfg.thrift_max_message_length_in_mb() * (uint64_t(1) << 20),
};
});
tserver->start(sharded_parameter([this] { return _db.local().as_data_dictionary(); }), std::ref(_qp), std::ref(_ss), std::ref(_proxy), std::ref(_auth_service), std::ref(_mem_limiter), std::move(tsc)).get();
// #293 - do not stop anything
//engine().at_exit([tserver] {
// return tserver->stop();
//});
tserver->invoke_on_all(&thrift_server::listen, socket_address{ip, port}, keepalive).get();
clogger.info("Thrift server listening on {}:{} ...", ip, port);
_server = std::move(tserver);
});
}
future<> thrift_controller::stop_server() {
assert(this_shard_id() == 0);
if (_stopped) {
return make_ready_future<>();
}
return _ops_sem.wait().then([this] {
_stopped = true;
_ops_sem.broken();
_addr.reset();
return do_stop_server();
});
}
future<> thrift_controller::request_stop_server() {
if (!_ops_sem.try_wait()) {
throw std::runtime_error(format("Thrift server is starting, try again later"));
}
return do_stop_server().finally([this] { _ops_sem.signal(); });
}
future<> thrift_controller::do_stop_server() {
return do_with(std::move(_server), [] (std::unique_ptr<distributed<thrift_server>>& tserver) {
if (tserver) {
return tserver->stop().then([] {
clogger.info("Thrift server stopped");
});
}
return make_ready_future<>();
});
}