diff --git a/gms/gossiper.cc b/gms/gossiper.cc index af38b60a78..6cf618a547 100644 --- a/gms/gossiper.cc +++ b/gms/gossiper.cc @@ -313,6 +313,9 @@ void gossiper::init_messaging_service_handler() { }); return messaging_service::no_wait(); }); + + // Start listening messaging_service after gossip message handlers are registered + ms().start_listen(); } void gossiper::uninit_messaging_service_handler() { diff --git a/init.cc b/init.cc index b10b16793f..df56351c08 100644 --- a/init.cc +++ b/init.cc @@ -79,7 +79,9 @@ void init_ms_fd_gossiper(sstring listen_address } // Init messaging_service - net::get_messaging_service().start(listen, storage_port, ew, ssl_storage_port, creds).get(); + // Delay listening messaging_service until gossip message handlers are registered + bool listen_now = false; + net::get_messaging_service().start(listen, storage_port, ew, ssl_storage_port, creds, listen_now).get(); // #293 - do not stop anything //engine().at_exit([] { return net::get_messaging_service().stop(); }); diff --git a/message/messaging_service.cc b/message/messaging_service.cc index 8cf407ff3d..112335e8e2 100644 --- a/message/messaging_service.cc +++ b/message/messaging_service.cc @@ -207,8 +207,8 @@ void register_handler(messaging_service* ms, messaging_verb verb, Func&& func) { ms->rpc()->register_handler(verb, std::move(func)); } -messaging_service::messaging_service(gms::inet_address ip, uint16_t port) - : messaging_service(std::move(ip), port, encrypt_what::none, 0, nullptr) +messaging_service::messaging_service(gms::inet_address ip, uint16_t port, bool listen_now) + : messaging_service(std::move(ip), port, encrypt_what::none, 0, nullptr, listen_now) {} static @@ -221,31 +221,41 @@ rpc_resource_limits() { return limits; } +void messaging_service::start_listen() { + if (!_server) { + auto addr = ipv4_addr{_listen_address.raw_addr(), _port}; + _server = std::unique_ptr(new rpc_protocol_server_wrapper(*_rpc, + addr, rpc_resource_limits())); + } + + if (!_server_tls) { + _server_tls = std::unique_ptr( + [this] () -> std::unique_ptr{ + if (_encrypt_what == encrypt_what::none) { + return nullptr; + } + listen_options lo; + lo.reuse_address = true; + auto addr = make_ipv4_address(ipv4_addr{_listen_address.raw_addr(), _ssl_port}); + return std::make_unique(*_rpc, + seastar::tls::listen(_credentials, addr, lo)); + }()); + } +} + messaging_service::messaging_service(gms::inet_address ip , uint16_t port , encrypt_what ew , uint16_t ssl_port , std::shared_ptr credentials + , bool listen_now ) : _listen_address(ip) , _port(port) , _ssl_port(ssl_port) , _encrypt_what(ew) , _rpc(new rpc_protocol_wrapper(serializer { })) - , _server(new rpc_protocol_server_wrapper(*_rpc, ipv4_addr { _listen_address.raw_addr(), _port }, rpc_resource_limits())) , _credentials(credentials ? credentials->build_server_credentials() : nullptr) - , _server_tls([this]() -> std::unique_ptr{ - if (_encrypt_what == encrypt_what::none) { - return nullptr; - } - listen_options lo; - lo.reuse_address = true; - return std::make_unique(*_rpc, - seastar::tls::listen(_credentials - , make_ipv4_address(ipv4_addr {_listen_address.raw_addr(), _ssl_port}) - , lo) - ); - }()) { _rpc->set_logger([] (const sstring& log) { rpc_logger.info("{}", log); @@ -255,6 +265,11 @@ messaging_service::messaging_service(gms::inet_address ip ci.attach_auxiliary("src_cpu_id", src_cpu_id); return rpc::no_wait; }); + + if (listen_now) { + start_listen(); + } + // Do this on just cpu 0, to avoid duplicate logs. if (engine().cpu_id() == 0) { if (_server_tls) { diff --git a/message/messaging_service.hh b/message/messaging_service.hh index 8a8d201fc5..eab4052717 100644 --- a/message/messaging_service.hh +++ b/message/messaging_service.hh @@ -184,11 +184,14 @@ private: public: using clock_type = std::chrono::steady_clock; public: - messaging_service(gms::inet_address ip = gms::inet_address("0.0.0.0"), uint16_t port = 7000); + messaging_service(gms::inet_address ip = gms::inet_address("0.0.0.0"), + uint16_t port = 7000, bool listen_now = true); messaging_service(gms::inet_address ip, uint16_t port, encrypt_what, - uint16_t ssl_port, std::shared_ptr); + uint16_t ssl_port, std::shared_ptr, + bool listen_now = true); ~messaging_service(); public: + void start_listen(); uint16_t port(); gms::inet_address listen_address(); future<> stop();