diff --git a/alternator/server.cc b/alternator/server.cc index 4ccd70809f..65bdc977e8 100644 --- a/alternator/server.cc +++ b/alternator/server.cc @@ -414,7 +414,13 @@ future<> server::init(net::inet_address addr, std::optional port, std: if (https_port) { set_routes(_https_server._routes); _https_server.set_content_length_limit(server::content_length_limit); - _https_server.set_tls_credentials(creds->build_server_credentials()); + _https_server.set_tls_credentials(creds->build_reloadable_server_credentials([](const std::unordered_set& files, std::exception_ptr ep) { + if (ep) { + slogger.warn("Exception loading {}: {}", files, ep); + } else { + slogger.info("Reloaded {}", files); + } + }).get0()); _https_server.listen(socket_address{addr, *https_port}).get(); _enabled_servers.push_back(std::ref(_https_server)); slogger.info("Alternator HTTPS server listening on {} port {}", addr, *https_port); diff --git a/gms/gossiper.cc b/gms/gossiper.cc index 75dea40f90..db69bafb0b 100644 --- a/gms/gossiper.cc +++ b/gms/gossiper.cc @@ -432,9 +432,9 @@ future<> gossiper::handle_shutdown_msg(inet_address from) { }); } -void gossiper::init_messaging_service_handler(bind_messaging_port do_bind) { +future<> gossiper::init_messaging_service_handler(bind_messaging_port do_bind) { if (_ms_registered) { - return; + return make_ready_future<>(); } _ms_registered = true; ms().register_gossip_digest_syn([] (const rpc::client_info& cinfo, gossip_digest_syn syn_msg) { @@ -485,8 +485,9 @@ void gossiper::init_messaging_service_handler(bind_messaging_port do_bind) { // Start listening messaging_service after gossip message handlers are registered if (do_bind) { - ms().start_listen(); + return ms().start_listen(); } + return make_ready_future<>(); } future<> gossiper::uninit_messaging_service_handler() { @@ -1724,7 +1725,7 @@ future<> gossiper::start_gossiping(int generation_nbr, std::map 0) { @@ -1762,7 +1763,7 @@ future<> gossiper::do_shadow_round() { // When peer node receives a syn message, it will send back a ack message. // So, we need to register gossip message handlers before sending syn message. get_gossiper().invoke_on_all([] (gossiper& g) { - g.init_messaging_service_handler(); + return g.init_messaging_service_handler(); }).get(); while (this->_in_shadow_round) { diff --git a/gms/gossiper.hh b/gms/gossiper.hh index fb1a183185..4ff21d8dc9 100644 --- a/gms/gossiper.hh +++ b/gms/gossiper.hh @@ -120,7 +120,7 @@ private: netw::messaging_service& ms() { return netw::get_local_messaging_service(); } - void init_messaging_service_handler(bind_messaging_port do_bind = bind_messaging_port::yes); + future<> init_messaging_service_handler(bind_messaging_port do_bind = bind_messaging_port::yes); future<> uninit_messaging_service_handler(); future<> handle_syn_msg(msg_addr from, gossip_digest_syn syn_msg); future<> handle_ack_msg(msg_addr from, gossip_digest_ack ack_msg); diff --git a/init.cc b/init.cc index 0be36ac855..ed7cee034b 100644 --- a/init.cc +++ b/init.cc @@ -109,13 +109,12 @@ void init_ms_fd_gossiper(sharded& gossiper // Init messaging_service // Delay listening messaging_service until gossip message handlers are registered - bool listen_now = false; netw::messaging_service::memory_config mcfg = { std::max(0.08 * available_memory, 1'000'000) }; netw::messaging_service::scheduling_config scfg; scfg.statement = scheduling_config.statement; scfg.streaming = scheduling_config.streaming; scfg.gossip = scheduling_config.gossip; - netw::get_messaging_service().start(listen, storage_port, ew, cw, tndw, ssl_storage_port, creds, mcfg, scfg, sltba, listen_now).get(); + netw::get_messaging_service().start(listen, storage_port, ew, cw, tndw, ssl_storage_port, creds, mcfg, scfg, sltba).get(); // #293 - do not stop anything //engine().at_exit([] { return netw::get_messaging_service().stop(); }); diff --git a/message/messaging_service.cc b/message/messaging_service.cc index 887f69af83..fd11e11c2b 100644 --- a/message/messaging_service.cc +++ b/message/messaging_service.cc @@ -277,9 +277,9 @@ future<> messaging_service::unregister_handler(messaging_verb verb) { return _rpc->unregister_handler(verb); } -messaging_service::messaging_service(gms::inet_address ip, uint16_t port, bool listen_now) +messaging_service::messaging_service(gms::inet_address ip, uint16_t port) : messaging_service(std::move(ip), port, encrypt_what::none, compress_what::none, tcp_nodelay_what::all, 0, nullptr, memory_config{1'000'000}, - scheduling_config{}, false, listen_now) + scheduling_config{}, false) {} static @@ -292,7 +292,24 @@ rpc_resource_limits(size_t memory_limit) { return limits; } -void messaging_service::start_listen() { +future<> messaging_service::start_listen() { + if (_credentials_builder && !_credentials) { + return _credentials_builder->build_reloadable_server_credentials([](const std::unordered_set& files, std::exception_ptr ep) { + if (ep) { + mlogger.warn("Exception loading {}: {}", files, ep); + } else { + mlogger.info("Reloaded {}", files); + } + }).then([this](shared_ptr creds) { + _credentials = std::move(creds); + do_start_listen(); + }); + } + do_start_listen(); + return make_ready_future<>(); +} + +void messaging_service::do_start_listen() { bool listen_to_bc = _should_listen_to_broadcast_address && _listen_address != utils::fb_utilities::get_broadcast_address(); rpc::server_options so; if (_compress_what != compress_what::none) { @@ -325,6 +342,9 @@ void messaging_service::start_listen() { if (_encrypt_what == encrypt_what::none) { return nullptr; } + if (!_credentials) { + throw std::invalid_argument("No certificates specified for encrypted service"); + } listen_options lo; lo.reuse_address = true; lo.lba = server_socket::load_balancing_algorithm::port; @@ -356,8 +376,7 @@ messaging_service::messaging_service(gms::inet_address ip , std::shared_ptr credentials , messaging_service::memory_config mcfg , scheduling_config scfg - , bool sltba - , bool listen_now) + , bool sltba) : _listen_address(ip) , _port(port) , _ssl_port(ssl_port) @@ -366,7 +385,7 @@ messaging_service::messaging_service(gms::inet_address ip , _tcp_nodelay_what(tnw) , _should_listen_to_broadcast_address(sltba) , _rpc(new rpc_protocol_wrapper(serializer { })) - , _credentials(credentials ? credentials->build_server_credentials() : nullptr) + , _credentials_builder(credentials ? std::make_unique(*credentials) : nullptr) , _mcfg(mcfg) , _scheduling_config(scfg) { @@ -377,10 +396,6 @@ messaging_service::messaging_service(gms::inet_address ip ci.attach_auxiliary("max_result_size", max_result_size.value_or(query::result_memory_limiter::maximum_result_size)); return rpc::no_wait; }); - - if (listen_now) { - start_listen(); - } } msg_addr messaging_service::get_source(const rpc::client_info& cinfo) { diff --git a/message/messaging_service.hh b/message/messaging_service.hh index 506ed7e0c1..dc4c99cdb9 100644 --- a/message/messaging_service.hh +++ b/message/messaging_service.hh @@ -237,6 +237,7 @@ private: std::unique_ptr _rpc; std::array, 2> _server; ::shared_ptr _credentials; + std::unique_ptr _credentials_builder; std::array, 2> _server_tls; std::array _clients; uint64_t _dropped_messages[static_cast(messaging_verb::LAST)] = {}; @@ -248,13 +249,13 @@ public: using clock_type = lowres_clock; public: messaging_service(gms::inet_address ip = gms::inet_address("0.0.0.0"), - uint16_t port = 7000, bool listen_now = true); + uint16_t port = 7000); messaging_service(gms::inet_address ip, uint16_t port, encrypt_what, compress_what, tcp_nodelay_what, uint16_t ssl_port, std::shared_ptr, - memory_config mcfg, scheduling_config scfg, bool sltba = false, bool listen_now = true); + memory_config mcfg, scheduling_config scfg, bool sltba = false); ~messaging_service(); public: - void start_listen(); + future<> start_listen(); uint16_t port(); gms::inet_address listen_address(); future<> stop_tls_server(); @@ -511,6 +512,7 @@ public: void foreach_server_connection_stats(std::function&& f) const; private: bool remove_rpc_client_one(clients_map& clients, msg_addr id, bool dead_only); + void do_start_listen(); public: // Return rpc::protocol::client for a shard which is a ip + cpuid pair. shared_ptr get_rpc_client(messaging_verb verb, msg_addr id); diff --git a/redis/server.cc b/redis/server.cc index 8b27508134..eb5587385c 100644 --- a/redis/server.cc +++ b/redis/server.cc @@ -74,19 +74,30 @@ future<> redis_server::stop() { } future<> redis_server::listen(socket_address addr, std::shared_ptr creds, bool keepalive) { - listen_options lo; - lo.reuse_address = true; - server_socket ss; - try { - ss = creds - ? seastar::tls::listen(creds->build_server_credentials(), addr, lo) - : seastar::listen(addr, lo); - } catch (...) { - throw std::runtime_error(sprint("Redis server error while listening on %s -> %s", addr, std::current_exception())); + auto f = make_ready_future>(nullptr); + if (creds) { + f = creds->build_reloadable_server_credentials([](const std::unordered_set& files, std::exception_ptr ep) { + if (ep) { + logging.warn("Exception loading {}: {}", files, ep); + } else { + logging.info("Reloaded {}", files); + } + }); } - _listeners.emplace_back(std::move(ss)); - _stopped = when_all(std::move(_stopped), do_accepts(_listeners.size() - 1, keepalive, addr)).discard_result(); - return make_ready_future<>(); + return f.then([this, addr, keepalive](shared_ptr creds) { + listen_options lo; + lo.reuse_address = true; + server_socket ss; + try { + ss = creds + ? seastar::tls::listen(std::move(creds), addr, lo) + : seastar::listen(addr, lo); + } catch (...) { + throw std::runtime_error(sprint("Redis server error while listening on %s -> %s", addr, std::current_exception())); + } + _listeners.emplace_back(std::move(ss)); + _stopped = when_all(std::move(_stopped), do_accepts(_listeners.size() - 1, keepalive, addr)).discard_result(); + }); } future<> redis_server::do_accepts(int which, bool keepalive, socket_address server_addr) { diff --git a/test/boost/gossip_test.cc b/test/boost/gossip_test.cc index 92dd81de98..caab23e228 100644 --- a/test/boost/gossip_test.cc +++ b/test/boost/gossip_test.cc @@ -71,7 +71,7 @@ SEASTAR_TEST_CASE(test_boot_shutdown){ locator::i_endpoint_snitch::create_snitch("SimpleSnitch").get(); auto stop_snitch = defer([&] { locator::i_endpoint_snitch::stop_snitch().get(); }); - netw::get_messaging_service().start(gms::inet_address("127.0.0.1"), 7000, false /* don't bind */).get(); + netw::get_messaging_service().start(gms::inet_address("127.0.0.1"), 7000).get(); auto stop_messaging_service = defer([&] { netw::get_messaging_service().stop().get(); }); gms::get_gossiper().start(std::ref(abort_sources), std::ref(feature_service), std::ref(token_metadata), std::ref(*cfg)).get(); diff --git a/test/lib/cql_test_env.cc b/test/lib/cql_test_env.cc index d4eaa1170a..2f29343b11 100644 --- a/test/lib/cql_test_env.cc +++ b/test/lib/cql_test_env.cc @@ -420,7 +420,7 @@ public: const gms::inet_address listen("127.0.0.1"); auto& ms = netw::get_messaging_service(); // don't start listening so tests can be run in parallel - ms.start(listen, std::move(7000), false).get(); + ms.start(listen, std::move(7000)).get(); auto stop_ms = defer([&ms] { ms.stop().get(); }); sharded auth_service; diff --git a/test/lib/test_services.cc b/test/lib/test_services.cc index f1de94df67..bc83605d43 100644 --- a/test/lib/test_services.cc +++ b/test/lib/test_services.cc @@ -54,7 +54,7 @@ public: _mnotif.start().get(); _feature_service.start(gms::feature_config_from_db_config(_cfg)).get(); _gossiper.start(std::ref(_abort_source), std::ref(_feature_service), std::ref(_token_metadata), std::ref(_cfg)).get(); - netw::get_messaging_service().start(gms::inet_address("127.0.0.1"), 7000, false).get(); + netw::get_messaging_service().start(gms::inet_address("127.0.0.1"), 7000).get(); service::storage_service_config sscfg; sscfg.available_memory = memory::stats().total_memory(); service::get_storage_service().start(std::ref(_abort_source), std::ref(_db), std::ref(_gossiper), std::ref(_auth_service), std::ref(_sys_dist_ks), std::ref(_view_update_generator), std::ref(_feature_service), sscfg, std::ref(_mnotif), std::ref(_token_metadata), true).get(); diff --git a/transport/server.cc b/transport/server.cc index 09718d57e3..4a25210774 100644 --- a/transport/server.cc +++ b/transport/server.cc @@ -210,19 +210,30 @@ future<> cql_server::stop() { future<> cql_server::listen(socket_address addr, std::shared_ptr creds, bool keepalive) { - listen_options lo; - lo.reuse_address = true; - server_socket ss; - try { - ss = creds - ? seastar::tls::listen(creds->build_server_credentials(), addr, lo) - : seastar::listen(addr, lo); - } catch (...) { - throw std::runtime_error(format("CQLServer error while listening on {} -> {}", addr, std::current_exception())); + auto f = make_ready_future>(nullptr); + if (creds) { + f = creds->build_reloadable_server_credentials([](const std::unordered_set& files, std::exception_ptr ep) { + if (ep) { + clogger.warn("Exception loading {}: {}", files, ep); + } else { + clogger.info("Reloaded {}", files); + } + }); } - _listeners.emplace_back(std::move(ss)); - _stopped = when_all(std::move(_stopped), do_accepts(_listeners.size() - 1, keepalive, addr)).discard_result(); - return make_ready_future<>(); + return f.then([this, addr, keepalive](shared_ptr creds) { + listen_options lo; + lo.reuse_address = true; + server_socket ss; + try { + ss = creds + ? seastar::tls::listen(std::move(creds), addr, lo) + : seastar::listen(addr, lo); + } catch (...) { + throw std::runtime_error(format("CQLServer error while listening on {} -> {}", addr, std::current_exception())); + } + _listeners.emplace_back(std::move(ss)); + _stopped = when_all(std::move(_stopped), do_accepts(_listeners.size() - 1, keepalive, addr)).discard_result(); + }); } future<>