Merge 'Support SSL Certificate Hot Reloading' from Calle
" Fixes #6067 Makes the scylla endpoint initializations that support TLS use reloadable certificate stores, watching used cert + key files for changes, and reload iff modified. Tests in separate dtest set. " * elcallio-calle/reloadable-tls: transport: Use reloadable tls certificates redis: Use reloadable tls certificates alternator: Use reloadable tls certificates messaging_service: Use reloadable TLS certificates
This commit is contained in:
@@ -414,7 +414,13 @@ future<> server::init(net::inet_address addr, std::optional<uint16_t> port, std:
|
||||
if (https_port) {
|
||||
set_routes(_https_server._routes);
|
||||
_https_server.set_content_length_limit(server::content_length_limit);
|
||||
_https_server.set_tls_credentials(creds->build_server_credentials());
|
||||
_https_server.set_tls_credentials(creds->build_reloadable_server_credentials([](const std::unordered_set<sstring>& files, std::exception_ptr ep) {
|
||||
if (ep) {
|
||||
slogger.warn("Exception loading {}: {}", files, ep);
|
||||
} else {
|
||||
slogger.info("Reloaded {}", files);
|
||||
}
|
||||
}).get0());
|
||||
_https_server.listen(socket_address{addr, *https_port}).get();
|
||||
_enabled_servers.push_back(std::ref(_https_server));
|
||||
slogger.info("Alternator HTTPS server listening on {} port {}", addr, *https_port);
|
||||
|
||||
@@ -432,9 +432,9 @@ future<> gossiper::handle_shutdown_msg(inet_address from) {
|
||||
});
|
||||
}
|
||||
|
||||
void gossiper::init_messaging_service_handler(bind_messaging_port do_bind) {
|
||||
future<> gossiper::init_messaging_service_handler(bind_messaging_port do_bind) {
|
||||
if (_ms_registered) {
|
||||
return;
|
||||
return make_ready_future<>();
|
||||
}
|
||||
_ms_registered = true;
|
||||
ms().register_gossip_digest_syn([] (const rpc::client_info& cinfo, gossip_digest_syn syn_msg) {
|
||||
@@ -485,8 +485,9 @@ void gossiper::init_messaging_service_handler(bind_messaging_port do_bind) {
|
||||
|
||||
// Start listening messaging_service after gossip message handlers are registered
|
||||
if (do_bind) {
|
||||
ms().start_listen();
|
||||
return ms().start_listen();
|
||||
}
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
future<> gossiper::uninit_messaging_service_handler() {
|
||||
@@ -1724,7 +1725,7 @@ future<> gossiper::start_gossiping(int generation_nbr, std::map<application_stat
|
||||
// Although gossiper runs on cpu0 only, we need to listen incoming gossip
|
||||
// message on all cpus and forard them to cpu0 to process.
|
||||
return get_gossiper().invoke_on_all([do_bind] (gossiper& g) {
|
||||
g.init_messaging_service_handler(do_bind);
|
||||
return g.init_messaging_service_handler(do_bind);
|
||||
}).then([this, generation_nbr, preload_local_states] () mutable {
|
||||
build_seeds_list();
|
||||
if (_cfg.force_gossip_generation() > 0) {
|
||||
@@ -1762,7 +1763,7 @@ future<> gossiper::do_shadow_round() {
|
||||
// When peer node receives a syn message, it will send back a ack message.
|
||||
// So, we need to register gossip message handlers before sending syn message.
|
||||
get_gossiper().invoke_on_all([] (gossiper& g) {
|
||||
g.init_messaging_service_handler();
|
||||
return g.init_messaging_service_handler();
|
||||
}).get();
|
||||
|
||||
while (this->_in_shadow_round) {
|
||||
|
||||
@@ -120,7 +120,7 @@ private:
|
||||
netw::messaging_service& ms() {
|
||||
return netw::get_local_messaging_service();
|
||||
}
|
||||
void init_messaging_service_handler(bind_messaging_port do_bind = bind_messaging_port::yes);
|
||||
future<> init_messaging_service_handler(bind_messaging_port do_bind = bind_messaging_port::yes);
|
||||
future<> uninit_messaging_service_handler();
|
||||
future<> handle_syn_msg(msg_addr from, gossip_digest_syn syn_msg);
|
||||
future<> handle_ack_msg(msg_addr from, gossip_digest_ack ack_msg);
|
||||
|
||||
3
init.cc
3
init.cc
@@ -109,13 +109,12 @@ void init_ms_fd_gossiper(sharded<gms::gossiper>& gossiper
|
||||
|
||||
// Init messaging_service
|
||||
// Delay listening messaging_service until gossip message handlers are registered
|
||||
bool listen_now = false;
|
||||
netw::messaging_service::memory_config mcfg = { std::max<size_t>(0.08 * available_memory, 1'000'000) };
|
||||
netw::messaging_service::scheduling_config scfg;
|
||||
scfg.statement = scheduling_config.statement;
|
||||
scfg.streaming = scheduling_config.streaming;
|
||||
scfg.gossip = scheduling_config.gossip;
|
||||
netw::get_messaging_service().start(listen, storage_port, ew, cw, tndw, ssl_storage_port, creds, mcfg, scfg, sltba, listen_now).get();
|
||||
netw::get_messaging_service().start(listen, storage_port, ew, cw, tndw, ssl_storage_port, creds, mcfg, scfg, sltba).get();
|
||||
|
||||
// #293 - do not stop anything
|
||||
//engine().at_exit([] { return netw::get_messaging_service().stop(); });
|
||||
|
||||
@@ -277,9 +277,9 @@ future<> messaging_service::unregister_handler(messaging_verb verb) {
|
||||
return _rpc->unregister_handler(verb);
|
||||
}
|
||||
|
||||
messaging_service::messaging_service(gms::inet_address ip, uint16_t port, bool listen_now)
|
||||
messaging_service::messaging_service(gms::inet_address ip, uint16_t port)
|
||||
: messaging_service(std::move(ip), port, encrypt_what::none, compress_what::none, tcp_nodelay_what::all, 0, nullptr, memory_config{1'000'000},
|
||||
scheduling_config{}, false, listen_now)
|
||||
scheduling_config{}, false)
|
||||
{}
|
||||
|
||||
static
|
||||
@@ -292,7 +292,24 @@ rpc_resource_limits(size_t memory_limit) {
|
||||
return limits;
|
||||
}
|
||||
|
||||
void messaging_service::start_listen() {
|
||||
future<> messaging_service::start_listen() {
|
||||
if (_credentials_builder && !_credentials) {
|
||||
return _credentials_builder->build_reloadable_server_credentials([](const std::unordered_set<sstring>& files, std::exception_ptr ep) {
|
||||
if (ep) {
|
||||
mlogger.warn("Exception loading {}: {}", files, ep);
|
||||
} else {
|
||||
mlogger.info("Reloaded {}", files);
|
||||
}
|
||||
}).then([this](shared_ptr<seastar::tls::server_credentials> creds) {
|
||||
_credentials = std::move(creds);
|
||||
do_start_listen();
|
||||
});
|
||||
}
|
||||
do_start_listen();
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
void messaging_service::do_start_listen() {
|
||||
bool listen_to_bc = _should_listen_to_broadcast_address && _listen_address != utils::fb_utilities::get_broadcast_address();
|
||||
rpc::server_options so;
|
||||
if (_compress_what != compress_what::none) {
|
||||
@@ -325,6 +342,9 @@ void messaging_service::start_listen() {
|
||||
if (_encrypt_what == encrypt_what::none) {
|
||||
return nullptr;
|
||||
}
|
||||
if (!_credentials) {
|
||||
throw std::invalid_argument("No certificates specified for encrypted service");
|
||||
}
|
||||
listen_options lo;
|
||||
lo.reuse_address = true;
|
||||
lo.lba = server_socket::load_balancing_algorithm::port;
|
||||
@@ -356,8 +376,7 @@ messaging_service::messaging_service(gms::inet_address ip
|
||||
, std::shared_ptr<seastar::tls::credentials_builder> credentials
|
||||
, messaging_service::memory_config mcfg
|
||||
, scheduling_config scfg
|
||||
, bool sltba
|
||||
, bool listen_now)
|
||||
, bool sltba)
|
||||
: _listen_address(ip)
|
||||
, _port(port)
|
||||
, _ssl_port(ssl_port)
|
||||
@@ -366,7 +385,7 @@ messaging_service::messaging_service(gms::inet_address ip
|
||||
, _tcp_nodelay_what(tnw)
|
||||
, _should_listen_to_broadcast_address(sltba)
|
||||
, _rpc(new rpc_protocol_wrapper(serializer { }))
|
||||
, _credentials(credentials ? credentials->build_server_credentials() : nullptr)
|
||||
, _credentials_builder(credentials ? std::make_unique<seastar::tls::credentials_builder>(*credentials) : nullptr)
|
||||
, _mcfg(mcfg)
|
||||
, _scheduling_config(scfg)
|
||||
{
|
||||
@@ -377,10 +396,6 @@ messaging_service::messaging_service(gms::inet_address ip
|
||||
ci.attach_auxiliary("max_result_size", max_result_size.value_or(query::result_memory_limiter::maximum_result_size));
|
||||
return rpc::no_wait;
|
||||
});
|
||||
|
||||
if (listen_now) {
|
||||
start_listen();
|
||||
}
|
||||
}
|
||||
|
||||
msg_addr messaging_service::get_source(const rpc::client_info& cinfo) {
|
||||
|
||||
@@ -237,6 +237,7 @@ private:
|
||||
std::unique_ptr<rpc_protocol_wrapper> _rpc;
|
||||
std::array<std::unique_ptr<rpc_protocol_server_wrapper>, 2> _server;
|
||||
::shared_ptr<seastar::tls::server_credentials> _credentials;
|
||||
std::unique_ptr<seastar::tls::credentials_builder> _credentials_builder;
|
||||
std::array<std::unique_ptr<rpc_protocol_server_wrapper>, 2> _server_tls;
|
||||
std::array<clients_map, 4> _clients;
|
||||
uint64_t _dropped_messages[static_cast<int32_t>(messaging_verb::LAST)] = {};
|
||||
@@ -248,13 +249,13 @@ public:
|
||||
using clock_type = lowres_clock;
|
||||
public:
|
||||
messaging_service(gms::inet_address ip = gms::inet_address("0.0.0.0"),
|
||||
uint16_t port = 7000, bool listen_now = true);
|
||||
uint16_t port = 7000);
|
||||
messaging_service(gms::inet_address ip, uint16_t port, encrypt_what, compress_what, tcp_nodelay_what,
|
||||
uint16_t ssl_port, std::shared_ptr<seastar::tls::credentials_builder>,
|
||||
memory_config mcfg, scheduling_config scfg, bool sltba = false, bool listen_now = true);
|
||||
memory_config mcfg, scheduling_config scfg, bool sltba = false);
|
||||
~messaging_service();
|
||||
public:
|
||||
void start_listen();
|
||||
future<> start_listen();
|
||||
uint16_t port();
|
||||
gms::inet_address listen_address();
|
||||
future<> stop_tls_server();
|
||||
@@ -511,6 +512,7 @@ public:
|
||||
void foreach_server_connection_stats(std::function<void(const rpc::client_info&, const rpc::stats&)>&& f) const;
|
||||
private:
|
||||
bool remove_rpc_client_one(clients_map& clients, msg_addr id, bool dead_only);
|
||||
void do_start_listen();
|
||||
public:
|
||||
// Return rpc::protocol::client for a shard which is a ip + cpuid pair.
|
||||
shared_ptr<rpc_protocol_client_wrapper> get_rpc_client(messaging_verb verb, msg_addr id);
|
||||
|
||||
@@ -74,19 +74,30 @@ future<> redis_server::stop() {
|
||||
}
|
||||
|
||||
future<> redis_server::listen(socket_address addr, std::shared_ptr<seastar::tls::credentials_builder> creds, bool keepalive) {
|
||||
listen_options lo;
|
||||
lo.reuse_address = true;
|
||||
server_socket ss;
|
||||
try {
|
||||
ss = creds
|
||||
? seastar::tls::listen(creds->build_server_credentials(), addr, lo)
|
||||
: seastar::listen(addr, lo);
|
||||
} catch (...) {
|
||||
throw std::runtime_error(sprint("Redis server error while listening on %s -> %s", addr, std::current_exception()));
|
||||
auto f = make_ready_future<shared_ptr<seastar::tls::server_credentials>>(nullptr);
|
||||
if (creds) {
|
||||
f = creds->build_reloadable_server_credentials([](const std::unordered_set<sstring>& files, std::exception_ptr ep) {
|
||||
if (ep) {
|
||||
logging.warn("Exception loading {}: {}", files, ep);
|
||||
} else {
|
||||
logging.info("Reloaded {}", files);
|
||||
}
|
||||
});
|
||||
}
|
||||
_listeners.emplace_back(std::move(ss));
|
||||
_stopped = when_all(std::move(_stopped), do_accepts(_listeners.size() - 1, keepalive, addr)).discard_result();
|
||||
return make_ready_future<>();
|
||||
return f.then([this, addr, keepalive](shared_ptr<seastar::tls::server_credentials> creds) {
|
||||
listen_options lo;
|
||||
lo.reuse_address = true;
|
||||
server_socket ss;
|
||||
try {
|
||||
ss = creds
|
||||
? seastar::tls::listen(std::move(creds), addr, lo)
|
||||
: seastar::listen(addr, lo);
|
||||
} catch (...) {
|
||||
throw std::runtime_error(sprint("Redis server error while listening on %s -> %s", addr, std::current_exception()));
|
||||
}
|
||||
_listeners.emplace_back(std::move(ss));
|
||||
_stopped = when_all(std::move(_stopped), do_accepts(_listeners.size() - 1, keepalive, addr)).discard_result();
|
||||
});
|
||||
}
|
||||
|
||||
future<> redis_server::do_accepts(int which, bool keepalive, socket_address server_addr) {
|
||||
|
||||
@@ -71,7 +71,7 @@ SEASTAR_TEST_CASE(test_boot_shutdown){
|
||||
locator::i_endpoint_snitch::create_snitch("SimpleSnitch").get();
|
||||
auto stop_snitch = defer([&] { locator::i_endpoint_snitch::stop_snitch().get(); });
|
||||
|
||||
netw::get_messaging_service().start(gms::inet_address("127.0.0.1"), 7000, false /* don't bind */).get();
|
||||
netw::get_messaging_service().start(gms::inet_address("127.0.0.1"), 7000).get();
|
||||
auto stop_messaging_service = defer([&] { netw::get_messaging_service().stop().get(); });
|
||||
|
||||
gms::get_gossiper().start(std::ref(abort_sources), std::ref(feature_service), std::ref(token_metadata), std::ref(*cfg)).get();
|
||||
|
||||
@@ -420,7 +420,7 @@ public:
|
||||
const gms::inet_address listen("127.0.0.1");
|
||||
auto& ms = netw::get_messaging_service();
|
||||
// don't start listening so tests can be run in parallel
|
||||
ms.start(listen, std::move(7000), false).get();
|
||||
ms.start(listen, std::move(7000)).get();
|
||||
auto stop_ms = defer([&ms] { ms.stop().get(); });
|
||||
|
||||
sharded<auth::service> auth_service;
|
||||
|
||||
@@ -54,7 +54,7 @@ public:
|
||||
_mnotif.start().get();
|
||||
_feature_service.start(gms::feature_config_from_db_config(_cfg)).get();
|
||||
_gossiper.start(std::ref(_abort_source), std::ref(_feature_service), std::ref(_token_metadata), std::ref(_cfg)).get();
|
||||
netw::get_messaging_service().start(gms::inet_address("127.0.0.1"), 7000, false).get();
|
||||
netw::get_messaging_service().start(gms::inet_address("127.0.0.1"), 7000).get();
|
||||
service::storage_service_config sscfg;
|
||||
sscfg.available_memory = memory::stats().total_memory();
|
||||
service::get_storage_service().start(std::ref(_abort_source), std::ref(_db), std::ref(_gossiper), std::ref(_auth_service), std::ref(_sys_dist_ks), std::ref(_view_update_generator), std::ref(_feature_service), sscfg, std::ref(_mnotif), std::ref(_token_metadata), true).get();
|
||||
|
||||
@@ -210,19 +210,30 @@ future<> cql_server::stop() {
|
||||
|
||||
future<>
|
||||
cql_server::listen(socket_address addr, std::shared_ptr<seastar::tls::credentials_builder> creds, bool keepalive) {
|
||||
listen_options lo;
|
||||
lo.reuse_address = true;
|
||||
server_socket ss;
|
||||
try {
|
||||
ss = creds
|
||||
? seastar::tls::listen(creds->build_server_credentials(), addr, lo)
|
||||
: seastar::listen(addr, lo);
|
||||
} catch (...) {
|
||||
throw std::runtime_error(format("CQLServer error while listening on {} -> {}", addr, std::current_exception()));
|
||||
auto f = make_ready_future<shared_ptr<seastar::tls::server_credentials>>(nullptr);
|
||||
if (creds) {
|
||||
f = creds->build_reloadable_server_credentials([](const std::unordered_set<sstring>& files, std::exception_ptr ep) {
|
||||
if (ep) {
|
||||
clogger.warn("Exception loading {}: {}", files, ep);
|
||||
} else {
|
||||
clogger.info("Reloaded {}", files);
|
||||
}
|
||||
});
|
||||
}
|
||||
_listeners.emplace_back(std::move(ss));
|
||||
_stopped = when_all(std::move(_stopped), do_accepts(_listeners.size() - 1, keepalive, addr)).discard_result();
|
||||
return make_ready_future<>();
|
||||
return f.then([this, addr, keepalive](shared_ptr<seastar::tls::server_credentials> creds) {
|
||||
listen_options lo;
|
||||
lo.reuse_address = true;
|
||||
server_socket ss;
|
||||
try {
|
||||
ss = creds
|
||||
? seastar::tls::listen(std::move(creds), addr, lo)
|
||||
: seastar::listen(addr, lo);
|
||||
} catch (...) {
|
||||
throw std::runtime_error(format("CQLServer error while listening on {} -> {}", addr, std::current_exception()));
|
||||
}
|
||||
_listeners.emplace_back(std::move(ss));
|
||||
_stopped = when_all(std::move(_stopped), do_accepts(_listeners.size() - 1, keepalive, addr)).discard_result();
|
||||
});
|
||||
}
|
||||
|
||||
future<>
|
||||
|
||||
Reference in New Issue
Block a user