From fcd53765850bb4ed710aa8406d2a13006e11c8df Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Tue, 7 Sep 2021 16:06:11 +0300 Subject: [PATCH] gossiper: remove uses of get_local_gossiper for its rpc server Initialization happens in the gossiper itself, so we can capture 'this'. If we need to move to shard 0, use sharded::invoke_on() to get the local instance. --- gms/gossiper.cc | 32 +++++++++++++++----------------- 1 file changed, 15 insertions(+), 17 deletions(-) diff --git a/gms/gossiper.cc b/gms/gossiper.cc index d1ee8c77f7..d3fcd7edf2 100644 --- a/gms/gossiper.cc +++ b/gms/gossiper.cc @@ -473,54 +473,52 @@ future<> gossiper::init_messaging_service_handler(bind_messaging_port do_bind) { return make_ready_future<>(); } _ms_registered = true; - _messaging.register_gossip_digest_syn([] (const rpc::client_info& cinfo, gossip_digest_syn syn_msg) { + _messaging.register_gossip_digest_syn([this] (const rpc::client_info& cinfo, gossip_digest_syn syn_msg) { auto from = netw::messaging_service::get_source(cinfo); // In a new fiber. - (void)smp::submit_to(0, [from, syn_msg = std::move(syn_msg)] () mutable { - auto& gossiper = gms::get_local_gossiper(); + (void)container().invoke_on(0, [from, syn_msg = std::move(syn_msg)] (gms::gossiper& gossiper) mutable { return gossiper.handle_syn_msg(from, std::move(syn_msg)); }).handle_exception([] (auto ep) { logger.warn("Fail to handle GOSSIP_DIGEST_SYN: {}", ep); }); return messaging_service::no_wait(); }); - _messaging.register_gossip_digest_ack([] (const rpc::client_info& cinfo, gossip_digest_ack msg) { + _messaging.register_gossip_digest_ack([this] (const rpc::client_info& cinfo, gossip_digest_ack msg) { auto from = netw::messaging_service::get_source(cinfo); // In a new fiber. - (void)smp::submit_to(0, [from, msg = std::move(msg)] () mutable { - auto& gossiper = gms::get_local_gossiper(); + (void)container().invoke_on(0, [from, msg = std::move(msg)] (gms::gossiper& gossiper) mutable { return gossiper.handle_ack_msg(from, std::move(msg)); }).handle_exception([] (auto ep) { logger.warn("Fail to handle GOSSIP_DIGEST_ACK: {}", ep); }); return messaging_service::no_wait(); }); - _messaging.register_gossip_digest_ack2([] (const rpc::client_info& cinfo, gossip_digest_ack2 msg) { + _messaging.register_gossip_digest_ack2([this] (const rpc::client_info& cinfo, gossip_digest_ack2 msg) { auto from = netw::messaging_service::get_source(cinfo); // In a new fiber. - (void)smp::submit_to(0, [from, msg = std::move(msg)] () mutable { - return gms::get_local_gossiper().handle_ack2_msg(from, std::move(msg)); + (void)container().invoke_on(0, [from, msg = std::move(msg)] (gms::gossiper& gossiper) mutable { + return gossiper.handle_ack2_msg(from, std::move(msg)); }).handle_exception([] (auto ep) { logger.warn("Fail to handle GOSSIP_DIGEST_ACK2: {}", ep); }); return messaging_service::no_wait(); }); - _messaging.register_gossip_echo([] (const rpc::client_info& cinfo, rpc::optional generation_number_opt) { + _messaging.register_gossip_echo([this] (const rpc::client_info& cinfo, rpc::optional generation_number_opt) { auto from = cinfo.retrieve_auxiliary("baddr"); - return gms::get_local_gossiper().handle_echo_msg(from, generation_number_opt); + return handle_echo_msg(from, generation_number_opt); }); - _messaging.register_gossip_shutdown([] (inet_address from) { + _messaging.register_gossip_shutdown([this] (inet_address from) { // In a new fiber. - (void)smp::submit_to(0, [from] { - return gms::get_local_gossiper().handle_shutdown_msg(from); + (void)container().invoke_on(0, [from] (gms::gossiper& gossiper) { + return gossiper.handle_shutdown_msg(from); }).handle_exception([] (auto ep) { logger.warn("Fail to handle GOSSIP_SHUTDOWN: {}", ep); }); return messaging_service::no_wait(); }); - _messaging.register_gossip_get_endpoint_states([] (const rpc::client_info& cinfo, gossip_get_endpoint_states_request request) { - return smp::submit_to(0, [request = std::move(request)] () mutable { - return gms::get_local_gossiper().handle_get_endpoint_states_msg(std::move(request)); + _messaging.register_gossip_get_endpoint_states([this] (const rpc::client_info& cinfo, gossip_get_endpoint_states_request request) { + return container().invoke_on(0, [request = std::move(request)] (gms::gossiper& gossiper) mutable { + return gossiper.handle_get_endpoint_states_msg(std::move(request)); }); });