Merge 'Support SSL Certificate Hot Reloading' from Calle

"
Fixes #6067

Makes the scylla endpoint initializations that support TLS use reloadable certificate stores, watching used cert + key files for changes, and reload iff modified.

Tests in separate dtest set.
"

* elcallio-calle/reloadable-tls:
  transport: Use reloadable tls certificates
  redis: Use reloadable tls certificates
  alternator: Use reloadable tls certificates
  messaging_service: Use reloadable TLS certificates
This commit is contained in:
Avi Kivity
2020-05-04 15:11:16 +03:00
11 changed files with 94 additions and 49 deletions

View File

@@ -414,7 +414,13 @@ future<> server::init(net::inet_address addr, std::optional<uint16_t> port, std:
if (https_port) {
set_routes(_https_server._routes);
_https_server.set_content_length_limit(server::content_length_limit);
_https_server.set_tls_credentials(creds->build_server_credentials());
_https_server.set_tls_credentials(creds->build_reloadable_server_credentials([](const std::unordered_set<sstring>& files, std::exception_ptr ep) {
if (ep) {
slogger.warn("Exception loading {}: {}", files, ep);
} else {
slogger.info("Reloaded {}", files);
}
}).get0());
_https_server.listen(socket_address{addr, *https_port}).get();
_enabled_servers.push_back(std::ref(_https_server));
slogger.info("Alternator HTTPS server listening on {} port {}", addr, *https_port);

View File

@@ -432,9 +432,9 @@ future<> gossiper::handle_shutdown_msg(inet_address from) {
});
}
void gossiper::init_messaging_service_handler(bind_messaging_port do_bind) {
future<> gossiper::init_messaging_service_handler(bind_messaging_port do_bind) {
if (_ms_registered) {
return;
return make_ready_future<>();
}
_ms_registered = true;
ms().register_gossip_digest_syn([] (const rpc::client_info& cinfo, gossip_digest_syn syn_msg) {
@@ -485,8 +485,9 @@ void gossiper::init_messaging_service_handler(bind_messaging_port do_bind) {
// Start listening messaging_service after gossip message handlers are registered
if (do_bind) {
ms().start_listen();
return ms().start_listen();
}
return make_ready_future<>();
}
future<> gossiper::uninit_messaging_service_handler() {
@@ -1724,7 +1725,7 @@ future<> gossiper::start_gossiping(int generation_nbr, std::map<application_stat
// Although gossiper runs on cpu0 only, we need to listen incoming gossip
// message on all cpus and forard them to cpu0 to process.
return get_gossiper().invoke_on_all([do_bind] (gossiper& g) {
g.init_messaging_service_handler(do_bind);
return g.init_messaging_service_handler(do_bind);
}).then([this, generation_nbr, preload_local_states] () mutable {
build_seeds_list();
if (_cfg.force_gossip_generation() > 0) {
@@ -1762,7 +1763,7 @@ future<> gossiper::do_shadow_round() {
// When peer node receives a syn message, it will send back a ack message.
// So, we need to register gossip message handlers before sending syn message.
get_gossiper().invoke_on_all([] (gossiper& g) {
g.init_messaging_service_handler();
return g.init_messaging_service_handler();
}).get();
while (this->_in_shadow_round) {

View File

@@ -120,7 +120,7 @@ private:
netw::messaging_service& ms() {
return netw::get_local_messaging_service();
}
void init_messaging_service_handler(bind_messaging_port do_bind = bind_messaging_port::yes);
future<> init_messaging_service_handler(bind_messaging_port do_bind = bind_messaging_port::yes);
future<> uninit_messaging_service_handler();
future<> handle_syn_msg(msg_addr from, gossip_digest_syn syn_msg);
future<> handle_ack_msg(msg_addr from, gossip_digest_ack ack_msg);

View File

@@ -109,13 +109,12 @@ void init_ms_fd_gossiper(sharded<gms::gossiper>& gossiper
// Init messaging_service
// Delay listening messaging_service until gossip message handlers are registered
bool listen_now = false;
netw::messaging_service::memory_config mcfg = { std::max<size_t>(0.08 * available_memory, 1'000'000) };
netw::messaging_service::scheduling_config scfg;
scfg.statement = scheduling_config.statement;
scfg.streaming = scheduling_config.streaming;
scfg.gossip = scheduling_config.gossip;
netw::get_messaging_service().start(listen, storage_port, ew, cw, tndw, ssl_storage_port, creds, mcfg, scfg, sltba, listen_now).get();
netw::get_messaging_service().start(listen, storage_port, ew, cw, tndw, ssl_storage_port, creds, mcfg, scfg, sltba).get();
// #293 - do not stop anything
//engine().at_exit([] { return netw::get_messaging_service().stop(); });

View File

@@ -277,9 +277,9 @@ future<> messaging_service::unregister_handler(messaging_verb verb) {
return _rpc->unregister_handler(verb);
}
messaging_service::messaging_service(gms::inet_address ip, uint16_t port, bool listen_now)
messaging_service::messaging_service(gms::inet_address ip, uint16_t port)
: messaging_service(std::move(ip), port, encrypt_what::none, compress_what::none, tcp_nodelay_what::all, 0, nullptr, memory_config{1'000'000},
scheduling_config{}, false, listen_now)
scheduling_config{}, false)
{}
static
@@ -292,7 +292,24 @@ rpc_resource_limits(size_t memory_limit) {
return limits;
}
void messaging_service::start_listen() {
future<> messaging_service::start_listen() {
if (_credentials_builder && !_credentials) {
return _credentials_builder->build_reloadable_server_credentials([](const std::unordered_set<sstring>& files, std::exception_ptr ep) {
if (ep) {
mlogger.warn("Exception loading {}: {}", files, ep);
} else {
mlogger.info("Reloaded {}", files);
}
}).then([this](shared_ptr<seastar::tls::server_credentials> creds) {
_credentials = std::move(creds);
do_start_listen();
});
}
do_start_listen();
return make_ready_future<>();
}
void messaging_service::do_start_listen() {
bool listen_to_bc = _should_listen_to_broadcast_address && _listen_address != utils::fb_utilities::get_broadcast_address();
rpc::server_options so;
if (_compress_what != compress_what::none) {
@@ -325,6 +342,9 @@ void messaging_service::start_listen() {
if (_encrypt_what == encrypt_what::none) {
return nullptr;
}
if (!_credentials) {
throw std::invalid_argument("No certificates specified for encrypted service");
}
listen_options lo;
lo.reuse_address = true;
lo.lba = server_socket::load_balancing_algorithm::port;
@@ -356,8 +376,7 @@ messaging_service::messaging_service(gms::inet_address ip
, std::shared_ptr<seastar::tls::credentials_builder> credentials
, messaging_service::memory_config mcfg
, scheduling_config scfg
, bool sltba
, bool listen_now)
, bool sltba)
: _listen_address(ip)
, _port(port)
, _ssl_port(ssl_port)
@@ -366,7 +385,7 @@ messaging_service::messaging_service(gms::inet_address ip
, _tcp_nodelay_what(tnw)
, _should_listen_to_broadcast_address(sltba)
, _rpc(new rpc_protocol_wrapper(serializer { }))
, _credentials(credentials ? credentials->build_server_credentials() : nullptr)
, _credentials_builder(credentials ? std::make_unique<seastar::tls::credentials_builder>(*credentials) : nullptr)
, _mcfg(mcfg)
, _scheduling_config(scfg)
{
@@ -377,10 +396,6 @@ messaging_service::messaging_service(gms::inet_address ip
ci.attach_auxiliary("max_result_size", max_result_size.value_or(query::result_memory_limiter::maximum_result_size));
return rpc::no_wait;
});
if (listen_now) {
start_listen();
}
}
msg_addr messaging_service::get_source(const rpc::client_info& cinfo) {

View File

@@ -237,6 +237,7 @@ private:
std::unique_ptr<rpc_protocol_wrapper> _rpc;
std::array<std::unique_ptr<rpc_protocol_server_wrapper>, 2> _server;
::shared_ptr<seastar::tls::server_credentials> _credentials;
std::unique_ptr<seastar::tls::credentials_builder> _credentials_builder;
std::array<std::unique_ptr<rpc_protocol_server_wrapper>, 2> _server_tls;
std::array<clients_map, 4> _clients;
uint64_t _dropped_messages[static_cast<int32_t>(messaging_verb::LAST)] = {};
@@ -248,13 +249,13 @@ public:
using clock_type = lowres_clock;
public:
messaging_service(gms::inet_address ip = gms::inet_address("0.0.0.0"),
uint16_t port = 7000, bool listen_now = true);
uint16_t port = 7000);
messaging_service(gms::inet_address ip, uint16_t port, encrypt_what, compress_what, tcp_nodelay_what,
uint16_t ssl_port, std::shared_ptr<seastar::tls::credentials_builder>,
memory_config mcfg, scheduling_config scfg, bool sltba = false, bool listen_now = true);
memory_config mcfg, scheduling_config scfg, bool sltba = false);
~messaging_service();
public:
void start_listen();
future<> start_listen();
uint16_t port();
gms::inet_address listen_address();
future<> stop_tls_server();
@@ -511,6 +512,7 @@ public:
void foreach_server_connection_stats(std::function<void(const rpc::client_info&, const rpc::stats&)>&& f) const;
private:
bool remove_rpc_client_one(clients_map& clients, msg_addr id, bool dead_only);
void do_start_listen();
public:
// Return rpc::protocol::client for a shard which is a ip + cpuid pair.
shared_ptr<rpc_protocol_client_wrapper> get_rpc_client(messaging_verb verb, msg_addr id);

View File

@@ -74,19 +74,30 @@ future<> redis_server::stop() {
}
future<> redis_server::listen(socket_address addr, std::shared_ptr<seastar::tls::credentials_builder> creds, bool keepalive) {
listen_options lo;
lo.reuse_address = true;
server_socket ss;
try {
ss = creds
? seastar::tls::listen(creds->build_server_credentials(), addr, lo)
: seastar::listen(addr, lo);
} catch (...) {
throw std::runtime_error(sprint("Redis server error while listening on %s -> %s", addr, std::current_exception()));
auto f = make_ready_future<shared_ptr<seastar::tls::server_credentials>>(nullptr);
if (creds) {
f = creds->build_reloadable_server_credentials([](const std::unordered_set<sstring>& files, std::exception_ptr ep) {
if (ep) {
logging.warn("Exception loading {}: {}", files, ep);
} else {
logging.info("Reloaded {}", files);
}
});
}
_listeners.emplace_back(std::move(ss));
_stopped = when_all(std::move(_stopped), do_accepts(_listeners.size() - 1, keepalive, addr)).discard_result();
return make_ready_future<>();
return f.then([this, addr, keepalive](shared_ptr<seastar::tls::server_credentials> creds) {
listen_options lo;
lo.reuse_address = true;
server_socket ss;
try {
ss = creds
? seastar::tls::listen(std::move(creds), addr, lo)
: seastar::listen(addr, lo);
} catch (...) {
throw std::runtime_error(sprint("Redis server error while listening on %s -> %s", addr, std::current_exception()));
}
_listeners.emplace_back(std::move(ss));
_stopped = when_all(std::move(_stopped), do_accepts(_listeners.size() - 1, keepalive, addr)).discard_result();
});
}
future<> redis_server::do_accepts(int which, bool keepalive, socket_address server_addr) {

View File

@@ -71,7 +71,7 @@ SEASTAR_TEST_CASE(test_boot_shutdown){
locator::i_endpoint_snitch::create_snitch("SimpleSnitch").get();
auto stop_snitch = defer([&] { locator::i_endpoint_snitch::stop_snitch().get(); });
netw::get_messaging_service().start(gms::inet_address("127.0.0.1"), 7000, false /* don't bind */).get();
netw::get_messaging_service().start(gms::inet_address("127.0.0.1"), 7000).get();
auto stop_messaging_service = defer([&] { netw::get_messaging_service().stop().get(); });
gms::get_gossiper().start(std::ref(abort_sources), std::ref(feature_service), std::ref(token_metadata), std::ref(*cfg)).get();

View File

@@ -420,7 +420,7 @@ public:
const gms::inet_address listen("127.0.0.1");
auto& ms = netw::get_messaging_service();
// don't start listening so tests can be run in parallel
ms.start(listen, std::move(7000), false).get();
ms.start(listen, std::move(7000)).get();
auto stop_ms = defer([&ms] { ms.stop().get(); });
sharded<auth::service> auth_service;

View File

@@ -54,7 +54,7 @@ public:
_mnotif.start().get();
_feature_service.start(gms::feature_config_from_db_config(_cfg)).get();
_gossiper.start(std::ref(_abort_source), std::ref(_feature_service), std::ref(_token_metadata), std::ref(_cfg)).get();
netw::get_messaging_service().start(gms::inet_address("127.0.0.1"), 7000, false).get();
netw::get_messaging_service().start(gms::inet_address("127.0.0.1"), 7000).get();
service::storage_service_config sscfg;
sscfg.available_memory = memory::stats().total_memory();
service::get_storage_service().start(std::ref(_abort_source), std::ref(_db), std::ref(_gossiper), std::ref(_auth_service), std::ref(_sys_dist_ks), std::ref(_view_update_generator), std::ref(_feature_service), sscfg, std::ref(_mnotif), std::ref(_token_metadata), true).get();

View File

@@ -210,19 +210,30 @@ future<> cql_server::stop() {
future<>
cql_server::listen(socket_address addr, std::shared_ptr<seastar::tls::credentials_builder> creds, bool keepalive) {
listen_options lo;
lo.reuse_address = true;
server_socket ss;
try {
ss = creds
? seastar::tls::listen(creds->build_server_credentials(), addr, lo)
: seastar::listen(addr, lo);
} catch (...) {
throw std::runtime_error(format("CQLServer error while listening on {} -> {}", addr, std::current_exception()));
auto f = make_ready_future<shared_ptr<seastar::tls::server_credentials>>(nullptr);
if (creds) {
f = creds->build_reloadable_server_credentials([](const std::unordered_set<sstring>& files, std::exception_ptr ep) {
if (ep) {
clogger.warn("Exception loading {}: {}", files, ep);
} else {
clogger.info("Reloaded {}", files);
}
});
}
_listeners.emplace_back(std::move(ss));
_stopped = when_all(std::move(_stopped), do_accepts(_listeners.size() - 1, keepalive, addr)).discard_result();
return make_ready_future<>();
return f.then([this, addr, keepalive](shared_ptr<seastar::tls::server_credentials> creds) {
listen_options lo;
lo.reuse_address = true;
server_socket ss;
try {
ss = creds
? seastar::tls::listen(std::move(creds), addr, lo)
: seastar::listen(addr, lo);
} catch (...) {
throw std::runtime_error(format("CQLServer error while listening on {} -> {}", addr, std::current_exception()));
}
_listeners.emplace_back(std::move(ss));
_stopped = when_all(std::move(_stopped), do_accepts(_listeners.size() - 1, keepalive, addr)).discard_result();
});
}
future<>