From 697b16414aaf48bbcf61cd2a3ca55535f10944c8 Mon Sep 17 00:00:00 2001 From: Asias He Date: Thu, 18 Feb 2016 09:30:18 +0800 Subject: [PATCH] gossip: Make gossip message handling async In each gossip round, i.e., gossiper::run(), we do: 1) send syn message 2) peer node: receive syn message, send back ack message 3) process ack message in handle_ack_msg apply_state_locally mark_alive send_gossip_echo handle_major_state_change on_restart mark_alive send_gossip_echo mark_dead on_dead on_join apply_new_states do_on_change_notifications on_change 4) send back ack2 message 5) peer node: process ack2 message apply_state_locally At the moment, syn is "wait" message, it times out in 3 seconds. In step 3, all the registered gossip callbacks are called which might take significant amount of time to complete. In order to reduce the gossip round latency, we make syn "no-wait" and do not run the handle_ack_msg insdie the gossip::run(). As a result, we will not get a ack message as the return value of a syn message any more, so a GOSSIP_DIGEST_ACK message verb is introduced. With this patch, the gossip message exchange is now async. It is useful when some nodes are down in the cluster. We will not delay the gossip round, which is supposed to run every second, 3*n seconds (n = 1-3, since it talks to 1-3 peer nodes in each gossip round) or even longer (considering the time to run gossip callbacks). Later, we can make talking to the 1-3 peer nodes in parallel to reduce latency even more. Refs: #900 --- gms/gossiper.cc | 118 ++++++++++++++++++----------------- gms/gossiper.hh | 5 +- message/messaging_service.cc | 19 +++++- message/messaging_service.hh | 9 ++- tests/message.cc | 48 +++++++++----- 5 files changed, 119 insertions(+), 80 deletions(-) diff --git a/gms/gossiper.cc b/gms/gossiper.cc index e2d918ae8f..ccaaf7175b 100644 --- a/gms/gossiper.cc +++ b/gms/gossiper.cc @@ -160,22 +160,21 @@ void gossiper::do_sort(std::vector& g_digest_list) { } } -future gossiper::handle_syn_msg(gossip_digest_syn syn_msg) { +future<> gossiper::handle_syn_msg(msg_addr from, gossip_digest_syn syn_msg) { logger.trace("cluster_name:peer={},local={},partitioner_name:peer={},local={}", syn_msg.cluster_id(), get_cluster_name(), syn_msg.partioner(), get_partitioner_name()); this->set_last_processed_message_at(); - inet_address from; if (!this->is_enabled()) { - return make_ready_future(gossip_digest_ack()); + return make_ready_future<>(); } /* If the message is from a different cluster throw it away. */ if (syn_msg.cluster_id() != get_cluster_name()) { - return make_ready_future(gossip_digest_ack()); + return make_ready_future<>(); } if (syn_msg.partioner() != "" && syn_msg.partioner() != get_partitioner_name()) { - return make_ready_future(gossip_digest_ack()); + return make_ready_future<>(); } auto g_digest_list = syn_msg.get_gossip_digests(); @@ -184,7 +183,7 @@ future gossiper::handle_syn_msg(gossip_digest_syn syn_msg) { std::map delta_ep_state_map; this->examine_gossiper(g_digest_list, delta_gossip_digest_list, delta_ep_state_map); gms::gossip_digest_ack ack_msg(std::move(delta_gossip_digest_list), std::move(delta_ep_state_map)); - return make_ready_future(std::move(ack_msg)); + return this->ms().send_gossip_digest_ack(from, std::move(ack_msg)); } future<> gossiper::handle_ack_msg(msg_addr id, gossip_digest_ack ack_msg) { @@ -220,19 +219,50 @@ future<> gossiper::handle_ack_msg(msg_addr id, gossip_digest_ack ack_msg) { } gms::gossip_digest_ack2 ack2_msg(std::move(delta_ep_state_map)); logger.trace("Sending a GossipDigestACK2 to {}", id); - return this->ms().send_gossip_digest_ack2(id, std::move(ack2_msg)).then_wrapped([id] (auto&& f) { - try { - f.get(); - logger.trace("Got GossipDigestACK2 Reply"); - } catch (...) { - logger.warn("Fail to send GossipDigestACK2 to {}: {}", id, std::current_exception()); - } - return make_ready_future<>(); + return this->ms().send_gossip_digest_ack2(id, std::move(ack2_msg)).handle_exception([id] (auto ep) { + logger.warn("Fail to send GossipDigestACK2 to {}: {}", id, ep); }); }); } void gossiper::init_messaging_service_handler() { + if (_ms_registered) { + return; + } + _ms_registered = true; + ms().register_gossip_digest_syn([] (const rpc::client_info& cinfo, gossip_digest_syn syn_msg) { + auto from = net::messaging_service::get_source(cinfo); + smp::submit_to(0, [from, syn_msg = std::move(syn_msg)] () mutable { + auto& gossiper = gms::get_local_gossiper(); + return gossiper.handle_syn_msg(from, std::move(syn_msg)); + }).handle_exception([] (auto ep) { + logger.warn("Fail to handle GOSSIP_DIGEST_SYN: {}", ep); + }); + return messaging_service::no_wait(); + }); + ms().register_gossip_digest_ack([] (const rpc::client_info& cinfo, gossip_digest_ack msg) { + auto from = net::messaging_service::get_source(cinfo); + smp::submit_to(0, [from, msg = std::move(msg)] () mutable { + auto& gossiper = gms::get_local_gossiper(); + return gossiper.handle_ack_msg(from, std::move(msg)); + }).handle_exception([] (auto ep) { + logger.warn("Fail to handle GOSSIP_DIGEST_ACK: {}", ep); + }); + return messaging_service::no_wait(); + }); + ms().register_gossip_digest_ack2([] (gossip_digest_ack2 msg) { + smp::submit_to(0, [msg = std::move(msg)] () mutable { + auto& gossiper = gms::get_local_gossiper(); + gossiper.set_last_processed_message_at(); + auto& remote_ep_state_map = msg.get_endpoint_state_map(); + /* Notify the Failure Detector */ + gossiper.notify_failure_detector(remote_ep_state_map); + return gossiper.apply_state_locally(remote_ep_state_map); + }).handle_exception([] (auto ep) { + logger.warn("Fail to handle GOSSIP_DIGEST_ACK2: {}", ep); + }); + return messaging_service::no_wait(); + }); ms().register_gossip_echo([] { return smp::submit_to(0, [] { auto& gossiper = gms::get_local_gossiper(); @@ -256,25 +286,6 @@ void gossiper::init_messaging_service_handler() { }); return messaging_service::no_wait(); }); - ms().register_gossip_digest_syn([] (gossip_digest_syn syn_msg) { - return smp::submit_to(0, [syn_msg = std::move(syn_msg)] () mutable { - auto& gossiper = gms::get_local_gossiper(); - return gossiper.handle_syn_msg(std::move(syn_msg)); - }); - }); - ms().register_gossip_digest_ack2([] (gossip_digest_ack2 msg) { - smp::submit_to(0, [msg = std::move(msg)] () mutable { - auto& gossiper = gms::get_local_gossiper(); - gossiper.set_last_processed_message_at(); - auto& remote_ep_state_map = msg.get_endpoint_state_map(); - /* Notify the Failure Detector */ - gossiper.notify_failure_detector(remote_ep_state_map); - return gossiper.apply_state_locally(remote_ep_state_map); - }).handle_exception([] (auto ep) { - logger.warn("Fail to handle GOSSIP_DIGEST_ACK: {}", ep); - }); - return messaging_service::no_wait(); - }); } void gossiper::uninit_messaging_service_handler() { @@ -282,7 +293,9 @@ void gossiper::uninit_messaging_service_handler() { ms.unregister_gossip_echo(); ms.unregister_gossip_shutdown(); ms.unregister_gossip_digest_syn(); + ms.unregister_gossip_digest_ack(); ms.unregister_gossip_digest_ack2(); + _ms_registered = false; } future gossiper::send_gossip(gossip_digest_syn message, std::set epset) { @@ -297,18 +310,11 @@ future gossiper::send_gossip(gossip_digest_syn message, std::sethandle_ack_msg(id, std::move(ack_msg)); - } catch (...) { - // It is normal to reach here because it is normal that a node - // tries to send a SYN message to a peer node which is down before - // failure_detector thinks that peer node is down. - logger.trace("Fail to send GossipDigestSyn to {}: {}", id, std::current_exception()); - } - return make_ready_future<>(); + return ms().send_gossip_digest_syn(id, std::move(message)).handle_exception([id] (auto ep) { + // It is normal to reach here because it is normal that a node + // tries to send a SYN message to a peer node which is down before + // failure_detector thinks that peer node is down. + logger.trace("Fail to send GossipDigestSyn to {}: {}", id, ep); }).then([this, to] { return make_ready_future(_seeds.count(to)); }); @@ -1319,6 +1325,13 @@ future<> gossiper::do_shadow_round() { build_seeds_list(); _in_shadow_round = true; auto t = clk::now(); + + // When peer node receives a syn message, it will send back a ack message. + // So, we need to register gossip message handlers before sending syn message. + get_gossiper().invoke_on_all([] (gossiper& g) { + g.init_messaging_service_handler(); + }).get(); + while (this->_in_shadow_round) { // send a completely empty syn for (inet_address seed : _seeds) { @@ -1326,24 +1339,17 @@ future<> gossiper::do_shadow_round() { gossip_digest_syn message(get_cluster_name(), get_partitioner_name(), digests); auto id = get_msg_addr(seed); logger.trace("Sending a GossipDigestSyn (ShadowRound) to {} ...", id); - ms().send_gossip_digest_syn(id, std::move(message)).then_wrapped([this, id] (auto&& f) { - try { - auto ack_msg = f.get0(); - logger.trace("Got GossipDigestSyn (ShadowRound) Reply"); - return this->handle_ack_msg(id, std::move(ack_msg)); - } catch (...) { - logger.trace("Fail to send GossipDigestSyn (ShadowRound) to {}: {}", id, std::current_exception()); - } - return make_ready_future<>(); + ms().send_gossip_digest_syn(id, std::move(message)).handle_exception([id] (auto ep) { + logger.trace("Fail to send GossipDigestSyn (ShadowRound) to {}: {}", id, ep); }).get(); } auto& ss = service::get_local_storage_service(); + sleep(std::chrono::seconds(1)).get(); if (clk::now() > t + ss.get_ring_delay() * 60) { throw std::runtime_error(sprint("Unable to gossip with any seeds (ShadowRound)")); } if (this->_in_shadow_round) { - logger.info("Sleep 1 second and connect seeds again ... ({} seconds passed)", std::chrono::duration_cast(clk::now() - t).count()); - sleep(std::chrono::seconds(1)).get(); + logger.info("Connect seeds again ... ({} seconds passed)", std::chrono::duration_cast(clk::now() - t).count()); } } }); diff --git a/gms/gossiper.hh b/gms/gossiper.hh index 9e85b15008..132b255402 100644 --- a/gms/gossiper.hh +++ b/gms/gossiper.hh @@ -88,8 +88,8 @@ private: } void init_messaging_service_handler(); void uninit_messaging_service_handler(); - future handle_syn_msg(gossip_digest_syn syn_msg); - future<> handle_ack_msg(msg_addr id, gossip_digest_ack ack_msg); + future<> handle_syn_msg(msg_addr from, gossip_digest_syn syn_msg); + future<> handle_ack_msg(msg_addr from, gossip_digest_ack ack_msg); static constexpr uint32_t _default_cpuid = 0; msg_addr get_msg_addr(inet_address to) { return msg_addr{to, _default_cpuid}; @@ -506,6 +506,7 @@ public: future<> wait_for_gossip_to_settle(); private: uint64_t _nr_run = 0; + bool _ms_registered = false; }; extern distributed _the_gossiper; diff --git a/message/messaging_service.cc b/message/messaging_service.cc index 20055e8151..98c75fa561 100644 --- a/message/messaging_service.cc +++ b/message/messaging_service.cc @@ -614,16 +614,29 @@ future<> messaging_service::send_gossip_shutdown(msg_addr id, inet_address from) return send_message_oneway(this, messaging_verb::GOSSIP_SHUTDOWN, std::move(id), std::move(from)); } -void messaging_service::register_gossip_digest_syn(std::function (gossip_digest_syn)>&& func) { +// gossip syn +void messaging_service::register_gossip_digest_syn(std::function&& func) { register_handler(this, messaging_verb::GOSSIP_DIGEST_SYN, std::move(func)); } void messaging_service::unregister_gossip_digest_syn() { _rpc->unregister_handler(net::messaging_verb::GOSSIP_DIGEST_SYN); } -future messaging_service::send_gossip_digest_syn(msg_addr id, gossip_digest_syn msg) { - return send_message_timeout(this, messaging_verb::GOSSIP_DIGEST_SYN, std::move(id), 3000ms, std::move(msg)); +future<> messaging_service::send_gossip_digest_syn(msg_addr id, gossip_digest_syn msg) { + return send_message_oneway(this, messaging_verb::GOSSIP_DIGEST_SYN, std::move(id), std::move(msg)); } +// gossip ack +void messaging_service::register_gossip_digest_ack(std::function&& func) { + register_handler(this, messaging_verb::GOSSIP_DIGEST_ACK, std::move(func)); +} +void messaging_service::unregister_gossip_digest_ack() { + _rpc->unregister_handler(net::messaging_verb::GOSSIP_DIGEST_ACK); +} +future<> messaging_service::send_gossip_digest_ack(msg_addr id, gossip_digest_ack msg) { + return send_message_oneway(this, messaging_verb::GOSSIP_DIGEST_ACK, std::move(id), std::move(msg)); +} + +// gossip ack2 void messaging_service::register_gossip_digest_ack2(std::function&& func) { register_handler(this, messaging_verb::GOSSIP_DIGEST_ACK2, std::move(func)); } diff --git a/message/messaging_service.hh b/message/messaging_service.hh index 006b9e4b0f..771bc38f05 100644 --- a/message/messaging_service.hh +++ b/message/messaging_service.hh @@ -234,9 +234,14 @@ public: future<> send_gossip_shutdown(msg_addr id, inet_address from); // Wrapper for GOSSIP_DIGEST_SYN - void register_gossip_digest_syn(std::function (gms::gossip_digest_syn)>&& func); + void register_gossip_digest_syn(std::function&& func); void unregister_gossip_digest_syn(); - future send_gossip_digest_syn(msg_addr id, gms::gossip_digest_syn msg); + future<> send_gossip_digest_syn(msg_addr id, gms::gossip_digest_syn msg); + + // Wrapper for GOSSIP_DIGEST_ACK + void register_gossip_digest_ack(std::function&& func); + void unregister_gossip_digest_ack(); + future<> send_gossip_digest_ack(msg_addr id, gms::gossip_digest_ack msg); // Wrapper for GOSSIP_DIGEST_ACK2 void register_gossip_digest_ack2(std::function&& func); diff --git a/tests/message.cc b/tests/message.cc index 619eba68b8..1f09512d03 100644 --- a/tests/message.cc +++ b/tests/message.cc @@ -20,9 +20,10 @@ * along with Scylla. If not, see . */ -#include "core/reactor.hh" -#include "core/app-template.hh" -#include "core/sstring.hh" +#include +#include +#include +#include #include "message/messaging_service.hh" #include "gms/gossip_digest_syn.hh" #include "gms/gossip_digest_ack.hh" @@ -57,10 +58,13 @@ public: future<> stop() { return make_ready_future<>(); } + promise<> digest_test_done; public: void init_handler() { - ms.register_gossip_digest_syn([] (gms::gossip_digest_syn msg) { + ms.register_gossip_digest_syn([this] (const rpc::client_info& cinfo, gms::gossip_digest_syn msg) { print("Server got syn msg = %s\n", msg); + + auto from = net::messaging_service::get_source(cinfo); auto ep1 = inet_address("1.1.1.1"); auto ep2 = inet_address("2.2.2.2"); int32_t gen = 800; @@ -74,7 +78,26 @@ public: {ep2, endpoint_state()}, }; gms::gossip_digest_ack ack(std::move(digests), std::move(eps)); - return make_ready_future(std::move(ack)); + ms.send_gossip_digest_ack(from, std::move(ack)).handle_exception([] (auto ep) { + print("Fail to send ack : %s", ep); + }); + return messaging_service::no_wait(); + }); + + ms.register_gossip_digest_ack([this] (const rpc::client_info& cinfo, gms::gossip_digest_ack msg) { + print("Server got ack msg = %s\n", msg); + auto from = net::messaging_service::get_source(cinfo); + // Prepare gossip_digest_ack2 message + auto ep1 = inet_address("3.3.3.3"); + std::map eps{ + {ep1, endpoint_state()}, + }; + gms::gossip_digest_ack2 ack2(std::move(eps)); + ms.send_gossip_digest_ack2(from, std::move(ack2)).handle_exception([] (auto ep) { + print("Fail to send ack2 : %s", ep); + }); + digest_test_done.set_value(); + return messaging_service::no_wait(); }); ms.register_gossip_digest_ack2([] (gms::gossip_digest_ack2 msg) { @@ -108,18 +131,8 @@ public: {ep2, gen++, ver++}, }; gms::gossip_digest_syn syn("my_cluster", "my_partition", digests); - return ms.send_gossip_digest_syn(id, std::move(syn)).then([this, id] (gms::gossip_digest_ack ack) { - print("Client sent gossip_digest_syn got gossip_digest_ack reply = %s\n", ack); - // Prepare gossip_digest_ack2 message - auto ep1 = inet_address("3.3.3.3"); - std::map eps{ - {ep1, endpoint_state()}, - }; - gms::gossip_digest_ack2 ack2(std::move(eps)); - return ms.send_gossip_digest_ack2(id, std::move(ack2)).then([] () { - print("Client sent gossip_digest_ack2 got reply = void\n"); - return make_ready_future<>(); - }); + return ms.send_gossip_digest_syn(id, std::move(syn)).then([this] { + return digest_test_done.get_future(); }); } @@ -169,6 +182,7 @@ int main(int ac, char ** av) { api_port++; } const gms::inet_address listen = gms::inet_address(config["listen-address"].as()); + utils::fb_utilities::set_broadcast_address(listen); net::get_messaging_service().start(listen).then([config, api_port, stay_alive] () { auto testers = new distributed; testers->start().then([testers]{