gossiper: remove uses of get_local_gossiper for its rpc server

Initialization happens in the gossiper itself, so we can capture
'this'. If we need to move to shard 0, use sharded::invoke_on() to
get the local instance.
This commit is contained in:
Avi Kivity
2021-09-07 16:06:11 +03:00
parent 9fb9299d95
commit fcd5376585

View File

@@ -473,54 +473,52 @@ future<> gossiper::init_messaging_service_handler(bind_messaging_port do_bind) {
return make_ready_future<>();
}
_ms_registered = true;
_messaging.register_gossip_digest_syn([] (const rpc::client_info& cinfo, gossip_digest_syn syn_msg) {
_messaging.register_gossip_digest_syn([this] (const rpc::client_info& cinfo, gossip_digest_syn syn_msg) {
auto from = netw::messaging_service::get_source(cinfo);
// In a new fiber.
(void)smp::submit_to(0, [from, syn_msg = std::move(syn_msg)] () mutable {
auto& gossiper = gms::get_local_gossiper();
(void)container().invoke_on(0, [from, syn_msg = std::move(syn_msg)] (gms::gossiper& gossiper) mutable {
return gossiper.handle_syn_msg(from, std::move(syn_msg));
}).handle_exception([] (auto ep) {
logger.warn("Fail to handle GOSSIP_DIGEST_SYN: {}", ep);
});
return messaging_service::no_wait();
});
_messaging.register_gossip_digest_ack([] (const rpc::client_info& cinfo, gossip_digest_ack msg) {
_messaging.register_gossip_digest_ack([this] (const rpc::client_info& cinfo, gossip_digest_ack msg) {
auto from = netw::messaging_service::get_source(cinfo);
// In a new fiber.
(void)smp::submit_to(0, [from, msg = std::move(msg)] () mutable {
auto& gossiper = gms::get_local_gossiper();
(void)container().invoke_on(0, [from, msg = std::move(msg)] (gms::gossiper& gossiper) mutable {
return gossiper.handle_ack_msg(from, std::move(msg));
}).handle_exception([] (auto ep) {
logger.warn("Fail to handle GOSSIP_DIGEST_ACK: {}", ep);
});
return messaging_service::no_wait();
});
_messaging.register_gossip_digest_ack2([] (const rpc::client_info& cinfo, gossip_digest_ack2 msg) {
_messaging.register_gossip_digest_ack2([this] (const rpc::client_info& cinfo, gossip_digest_ack2 msg) {
auto from = netw::messaging_service::get_source(cinfo);
// In a new fiber.
(void)smp::submit_to(0, [from, msg = std::move(msg)] () mutable {
return gms::get_local_gossiper().handle_ack2_msg(from, std::move(msg));
(void)container().invoke_on(0, [from, msg = std::move(msg)] (gms::gossiper& gossiper) mutable {
return gossiper.handle_ack2_msg(from, std::move(msg));
}).handle_exception([] (auto ep) {
logger.warn("Fail to handle GOSSIP_DIGEST_ACK2: {}", ep);
});
return messaging_service::no_wait();
});
_messaging.register_gossip_echo([] (const rpc::client_info& cinfo, rpc::optional<int64_t> generation_number_opt) {
_messaging.register_gossip_echo([this] (const rpc::client_info& cinfo, rpc::optional<int64_t> generation_number_opt) {
auto from = cinfo.retrieve_auxiliary<gms::inet_address>("baddr");
return gms::get_local_gossiper().handle_echo_msg(from, generation_number_opt);
return handle_echo_msg(from, generation_number_opt);
});
_messaging.register_gossip_shutdown([] (inet_address from) {
_messaging.register_gossip_shutdown([this] (inet_address from) {
// In a new fiber.
(void)smp::submit_to(0, [from] {
return gms::get_local_gossiper().handle_shutdown_msg(from);
(void)container().invoke_on(0, [from] (gms::gossiper& gossiper) {
return gossiper.handle_shutdown_msg(from);
}).handle_exception([] (auto ep) {
logger.warn("Fail to handle GOSSIP_SHUTDOWN: {}", ep);
});
return messaging_service::no_wait();
});
_messaging.register_gossip_get_endpoint_states([] (const rpc::client_info& cinfo, gossip_get_endpoint_states_request request) {
return smp::submit_to(0, [request = std::move(request)] () mutable {
return gms::get_local_gossiper().handle_get_endpoint_states_msg(std::move(request));
_messaging.register_gossip_get_endpoint_states([this] (const rpc::client_info& cinfo, gossip_get_endpoint_states_request request) {
return container().invoke_on(0, [request = std::move(request)] (gms::gossiper& gossiper) mutable {
return gossiper.handle_get_endpoint_states_msg(std::move(request));
});
});