diff --git a/gms/gossiper.cc b/gms/gossiper.cc index e2d918ae8f..ccaaf7175b 100644 --- a/gms/gossiper.cc +++ b/gms/gossiper.cc @@ -160,22 +160,21 @@ void gossiper::do_sort(std::vector& g_digest_list) { } } -future gossiper::handle_syn_msg(gossip_digest_syn syn_msg) { +future<> gossiper::handle_syn_msg(msg_addr from, gossip_digest_syn syn_msg) { logger.trace("cluster_name:peer={},local={},partitioner_name:peer={},local={}", syn_msg.cluster_id(), get_cluster_name(), syn_msg.partioner(), get_partitioner_name()); this->set_last_processed_message_at(); - inet_address from; if (!this->is_enabled()) { - return make_ready_future(gossip_digest_ack()); + return make_ready_future<>(); } /* If the message is from a different cluster throw it away. */ if (syn_msg.cluster_id() != get_cluster_name()) { - return make_ready_future(gossip_digest_ack()); + return make_ready_future<>(); } if (syn_msg.partioner() != "" && syn_msg.partioner() != get_partitioner_name()) { - return make_ready_future(gossip_digest_ack()); + return make_ready_future<>(); } auto g_digest_list = syn_msg.get_gossip_digests(); @@ -184,7 +183,7 @@ future gossiper::handle_syn_msg(gossip_digest_syn syn_msg) { std::map delta_ep_state_map; this->examine_gossiper(g_digest_list, delta_gossip_digest_list, delta_ep_state_map); gms::gossip_digest_ack ack_msg(std::move(delta_gossip_digest_list), std::move(delta_ep_state_map)); - return make_ready_future(std::move(ack_msg)); + return this->ms().send_gossip_digest_ack(from, std::move(ack_msg)); } future<> gossiper::handle_ack_msg(msg_addr id, gossip_digest_ack ack_msg) { @@ -220,19 +219,50 @@ future<> gossiper::handle_ack_msg(msg_addr id, gossip_digest_ack ack_msg) { } gms::gossip_digest_ack2 ack2_msg(std::move(delta_ep_state_map)); logger.trace("Sending a GossipDigestACK2 to {}", id); - return this->ms().send_gossip_digest_ack2(id, std::move(ack2_msg)).then_wrapped([id] (auto&& f) { - try { - f.get(); - logger.trace("Got GossipDigestACK2 Reply"); - } catch (...) { - logger.warn("Fail to send GossipDigestACK2 to {}: {}", id, std::current_exception()); - } - return make_ready_future<>(); + return this->ms().send_gossip_digest_ack2(id, std::move(ack2_msg)).handle_exception([id] (auto ep) { + logger.warn("Fail to send GossipDigestACK2 to {}: {}", id, ep); }); }); } void gossiper::init_messaging_service_handler() { + if (_ms_registered) { + return; + } + _ms_registered = true; + ms().register_gossip_digest_syn([] (const rpc::client_info& cinfo, gossip_digest_syn syn_msg) { + auto from = net::messaging_service::get_source(cinfo); + smp::submit_to(0, [from, syn_msg = std::move(syn_msg)] () mutable { + auto& gossiper = gms::get_local_gossiper(); + return gossiper.handle_syn_msg(from, std::move(syn_msg)); + }).handle_exception([] (auto ep) { + logger.warn("Fail to handle GOSSIP_DIGEST_SYN: {}", ep); + }); + return messaging_service::no_wait(); + }); + ms().register_gossip_digest_ack([] (const rpc::client_info& cinfo, gossip_digest_ack msg) { + auto from = net::messaging_service::get_source(cinfo); + smp::submit_to(0, [from, msg = std::move(msg)] () mutable { + auto& gossiper = gms::get_local_gossiper(); + return gossiper.handle_ack_msg(from, std::move(msg)); + }).handle_exception([] (auto ep) { + logger.warn("Fail to handle GOSSIP_DIGEST_ACK: {}", ep); + }); + return messaging_service::no_wait(); + }); + ms().register_gossip_digest_ack2([] (gossip_digest_ack2 msg) { + smp::submit_to(0, [msg = std::move(msg)] () mutable { + auto& gossiper = gms::get_local_gossiper(); + gossiper.set_last_processed_message_at(); + auto& remote_ep_state_map = msg.get_endpoint_state_map(); + /* Notify the Failure Detector */ + gossiper.notify_failure_detector(remote_ep_state_map); + return gossiper.apply_state_locally(remote_ep_state_map); + }).handle_exception([] (auto ep) { + logger.warn("Fail to handle GOSSIP_DIGEST_ACK2: {}", ep); + }); + return messaging_service::no_wait(); + }); ms().register_gossip_echo([] { return smp::submit_to(0, [] { auto& gossiper = gms::get_local_gossiper(); @@ -256,25 +286,6 @@ void gossiper::init_messaging_service_handler() { }); return messaging_service::no_wait(); }); - ms().register_gossip_digest_syn([] (gossip_digest_syn syn_msg) { - return smp::submit_to(0, [syn_msg = std::move(syn_msg)] () mutable { - auto& gossiper = gms::get_local_gossiper(); - return gossiper.handle_syn_msg(std::move(syn_msg)); - }); - }); - ms().register_gossip_digest_ack2([] (gossip_digest_ack2 msg) { - smp::submit_to(0, [msg = std::move(msg)] () mutable { - auto& gossiper = gms::get_local_gossiper(); - gossiper.set_last_processed_message_at(); - auto& remote_ep_state_map = msg.get_endpoint_state_map(); - /* Notify the Failure Detector */ - gossiper.notify_failure_detector(remote_ep_state_map); - return gossiper.apply_state_locally(remote_ep_state_map); - }).handle_exception([] (auto ep) { - logger.warn("Fail to handle GOSSIP_DIGEST_ACK: {}", ep); - }); - return messaging_service::no_wait(); - }); } void gossiper::uninit_messaging_service_handler() { @@ -282,7 +293,9 @@ void gossiper::uninit_messaging_service_handler() { ms.unregister_gossip_echo(); ms.unregister_gossip_shutdown(); ms.unregister_gossip_digest_syn(); + ms.unregister_gossip_digest_ack(); ms.unregister_gossip_digest_ack2(); + _ms_registered = false; } future gossiper::send_gossip(gossip_digest_syn message, std::set epset) { @@ -297,18 +310,11 @@ future gossiper::send_gossip(gossip_digest_syn message, std::sethandle_ack_msg(id, std::move(ack_msg)); - } catch (...) { - // It is normal to reach here because it is normal that a node - // tries to send a SYN message to a peer node which is down before - // failure_detector thinks that peer node is down. - logger.trace("Fail to send GossipDigestSyn to {}: {}", id, std::current_exception()); - } - return make_ready_future<>(); + return ms().send_gossip_digest_syn(id, std::move(message)).handle_exception([id] (auto ep) { + // It is normal to reach here because it is normal that a node + // tries to send a SYN message to a peer node which is down before + // failure_detector thinks that peer node is down. + logger.trace("Fail to send GossipDigestSyn to {}: {}", id, ep); }).then([this, to] { return make_ready_future(_seeds.count(to)); }); @@ -1319,6 +1325,13 @@ future<> gossiper::do_shadow_round() { build_seeds_list(); _in_shadow_round = true; auto t = clk::now(); + + // When peer node receives a syn message, it will send back a ack message. + // So, we need to register gossip message handlers before sending syn message. + get_gossiper().invoke_on_all([] (gossiper& g) { + g.init_messaging_service_handler(); + }).get(); + while (this->_in_shadow_round) { // send a completely empty syn for (inet_address seed : _seeds) { @@ -1326,24 +1339,17 @@ future<> gossiper::do_shadow_round() { gossip_digest_syn message(get_cluster_name(), get_partitioner_name(), digests); auto id = get_msg_addr(seed); logger.trace("Sending a GossipDigestSyn (ShadowRound) to {} ...", id); - ms().send_gossip_digest_syn(id, std::move(message)).then_wrapped([this, id] (auto&& f) { - try { - auto ack_msg = f.get0(); - logger.trace("Got GossipDigestSyn (ShadowRound) Reply"); - return this->handle_ack_msg(id, std::move(ack_msg)); - } catch (...) { - logger.trace("Fail to send GossipDigestSyn (ShadowRound) to {}: {}", id, std::current_exception()); - } - return make_ready_future<>(); + ms().send_gossip_digest_syn(id, std::move(message)).handle_exception([id] (auto ep) { + logger.trace("Fail to send GossipDigestSyn (ShadowRound) to {}: {}", id, ep); }).get(); } auto& ss = service::get_local_storage_service(); + sleep(std::chrono::seconds(1)).get(); if (clk::now() > t + ss.get_ring_delay() * 60) { throw std::runtime_error(sprint("Unable to gossip with any seeds (ShadowRound)")); } if (this->_in_shadow_round) { - logger.info("Sleep 1 second and connect seeds again ... ({} seconds passed)", std::chrono::duration_cast(clk::now() - t).count()); - sleep(std::chrono::seconds(1)).get(); + logger.info("Connect seeds again ... ({} seconds passed)", std::chrono::duration_cast(clk::now() - t).count()); } } }); diff --git a/gms/gossiper.hh b/gms/gossiper.hh index 9e85b15008..132b255402 100644 --- a/gms/gossiper.hh +++ b/gms/gossiper.hh @@ -88,8 +88,8 @@ private: } void init_messaging_service_handler(); void uninit_messaging_service_handler(); - future handle_syn_msg(gossip_digest_syn syn_msg); - future<> handle_ack_msg(msg_addr id, gossip_digest_ack ack_msg); + future<> handle_syn_msg(msg_addr from, gossip_digest_syn syn_msg); + future<> handle_ack_msg(msg_addr from, gossip_digest_ack ack_msg); static constexpr uint32_t _default_cpuid = 0; msg_addr get_msg_addr(inet_address to) { return msg_addr{to, _default_cpuid}; @@ -506,6 +506,7 @@ public: future<> wait_for_gossip_to_settle(); private: uint64_t _nr_run = 0; + bool _ms_registered = false; }; extern distributed _the_gossiper; diff --git a/message/messaging_service.cc b/message/messaging_service.cc index 20055e8151..98c75fa561 100644 --- a/message/messaging_service.cc +++ b/message/messaging_service.cc @@ -614,16 +614,29 @@ future<> messaging_service::send_gossip_shutdown(msg_addr id, inet_address from) return send_message_oneway(this, messaging_verb::GOSSIP_SHUTDOWN, std::move(id), std::move(from)); } -void messaging_service::register_gossip_digest_syn(std::function (gossip_digest_syn)>&& func) { +// gossip syn +void messaging_service::register_gossip_digest_syn(std::function&& func) { register_handler(this, messaging_verb::GOSSIP_DIGEST_SYN, std::move(func)); } void messaging_service::unregister_gossip_digest_syn() { _rpc->unregister_handler(net::messaging_verb::GOSSIP_DIGEST_SYN); } -future messaging_service::send_gossip_digest_syn(msg_addr id, gossip_digest_syn msg) { - return send_message_timeout(this, messaging_verb::GOSSIP_DIGEST_SYN, std::move(id), 3000ms, std::move(msg)); +future<> messaging_service::send_gossip_digest_syn(msg_addr id, gossip_digest_syn msg) { + return send_message_oneway(this, messaging_verb::GOSSIP_DIGEST_SYN, std::move(id), std::move(msg)); } +// gossip ack +void messaging_service::register_gossip_digest_ack(std::function&& func) { + register_handler(this, messaging_verb::GOSSIP_DIGEST_ACK, std::move(func)); +} +void messaging_service::unregister_gossip_digest_ack() { + _rpc->unregister_handler(net::messaging_verb::GOSSIP_DIGEST_ACK); +} +future<> messaging_service::send_gossip_digest_ack(msg_addr id, gossip_digest_ack msg) { + return send_message_oneway(this, messaging_verb::GOSSIP_DIGEST_ACK, std::move(id), std::move(msg)); +} + +// gossip ack2 void messaging_service::register_gossip_digest_ack2(std::function&& func) { register_handler(this, messaging_verb::GOSSIP_DIGEST_ACK2, std::move(func)); } diff --git a/message/messaging_service.hh b/message/messaging_service.hh index 006b9e4b0f..771bc38f05 100644 --- a/message/messaging_service.hh +++ b/message/messaging_service.hh @@ -234,9 +234,14 @@ public: future<> send_gossip_shutdown(msg_addr id, inet_address from); // Wrapper for GOSSIP_DIGEST_SYN - void register_gossip_digest_syn(std::function (gms::gossip_digest_syn)>&& func); + void register_gossip_digest_syn(std::function&& func); void unregister_gossip_digest_syn(); - future send_gossip_digest_syn(msg_addr id, gms::gossip_digest_syn msg); + future<> send_gossip_digest_syn(msg_addr id, gms::gossip_digest_syn msg); + + // Wrapper for GOSSIP_DIGEST_ACK + void register_gossip_digest_ack(std::function&& func); + void unregister_gossip_digest_ack(); + future<> send_gossip_digest_ack(msg_addr id, gms::gossip_digest_ack msg); // Wrapper for GOSSIP_DIGEST_ACK2 void register_gossip_digest_ack2(std::function&& func); diff --git a/tests/message.cc b/tests/message.cc index 619eba68b8..1f09512d03 100644 --- a/tests/message.cc +++ b/tests/message.cc @@ -20,9 +20,10 @@ * along with Scylla. If not, see . */ -#include "core/reactor.hh" -#include "core/app-template.hh" -#include "core/sstring.hh" +#include +#include +#include +#include #include "message/messaging_service.hh" #include "gms/gossip_digest_syn.hh" #include "gms/gossip_digest_ack.hh" @@ -57,10 +58,13 @@ public: future<> stop() { return make_ready_future<>(); } + promise<> digest_test_done; public: void init_handler() { - ms.register_gossip_digest_syn([] (gms::gossip_digest_syn msg) { + ms.register_gossip_digest_syn([this] (const rpc::client_info& cinfo, gms::gossip_digest_syn msg) { print("Server got syn msg = %s\n", msg); + + auto from = net::messaging_service::get_source(cinfo); auto ep1 = inet_address("1.1.1.1"); auto ep2 = inet_address("2.2.2.2"); int32_t gen = 800; @@ -74,7 +78,26 @@ public: {ep2, endpoint_state()}, }; gms::gossip_digest_ack ack(std::move(digests), std::move(eps)); - return make_ready_future(std::move(ack)); + ms.send_gossip_digest_ack(from, std::move(ack)).handle_exception([] (auto ep) { + print("Fail to send ack : %s", ep); + }); + return messaging_service::no_wait(); + }); + + ms.register_gossip_digest_ack([this] (const rpc::client_info& cinfo, gms::gossip_digest_ack msg) { + print("Server got ack msg = %s\n", msg); + auto from = net::messaging_service::get_source(cinfo); + // Prepare gossip_digest_ack2 message + auto ep1 = inet_address("3.3.3.3"); + std::map eps{ + {ep1, endpoint_state()}, + }; + gms::gossip_digest_ack2 ack2(std::move(eps)); + ms.send_gossip_digest_ack2(from, std::move(ack2)).handle_exception([] (auto ep) { + print("Fail to send ack2 : %s", ep); + }); + digest_test_done.set_value(); + return messaging_service::no_wait(); }); ms.register_gossip_digest_ack2([] (gms::gossip_digest_ack2 msg) { @@ -108,18 +131,8 @@ public: {ep2, gen++, ver++}, }; gms::gossip_digest_syn syn("my_cluster", "my_partition", digests); - return ms.send_gossip_digest_syn(id, std::move(syn)).then([this, id] (gms::gossip_digest_ack ack) { - print("Client sent gossip_digest_syn got gossip_digest_ack reply = %s\n", ack); - // Prepare gossip_digest_ack2 message - auto ep1 = inet_address("3.3.3.3"); - std::map eps{ - {ep1, endpoint_state()}, - }; - gms::gossip_digest_ack2 ack2(std::move(eps)); - return ms.send_gossip_digest_ack2(id, std::move(ack2)).then([] () { - print("Client sent gossip_digest_ack2 got reply = void\n"); - return make_ready_future<>(); - }); + return ms.send_gossip_digest_syn(id, std::move(syn)).then([this] { + return digest_test_done.get_future(); }); } @@ -169,6 +182,7 @@ int main(int ac, char ** av) { api_port++; } const gms::inet_address listen = gms::inet_address(config["listen-address"].as()); + utils::fb_utilities::set_broadcast_address(listen); net::get_messaging_service().start(listen).then([config, api_port, stay_alive] () { auto testers = new distributed; testers->start().then([testers]{