From f689ef705a6ada0cce0ee15fcacce71fa264c6a3 Mon Sep 17 00:00:00 2001 From: Asias He Date: Mon, 4 May 2015 21:45:38 +0800 Subject: [PATCH] gossip: Forward gossip message to cpu0 There is one gossiper instance per node and it runs on cpu0 only. We can not guarantee there will always be a core to core tcp connection within messaging service, so messaging service needs to listen on all cpus. When a remote node connects to local node with a connection bound to cpu other than cpu0, we need to forward this message to cpu0. --- gms/gossiper.cc | 130 ++++++++++++++++++++++++----------------- gms/gossiper.hh | 7 ++- tests/urchin/gossip.cc | 38 ++++++------ 3 files changed, 101 insertions(+), 74 deletions(-) diff --git a/gms/gossiper.cc b/gms/gossiper.cc index ccb079130d..86d0b73a34 100644 --- a/gms/gossiper.cc +++ b/gms/gossiper.cc @@ -42,7 +42,6 @@ gossiper::gossiper() /* register with the Failure Detector for receiving Failure detector events */ get_local_failure_detector().register_failure_detection_event_listener(this->shared_from_this()); // Register this instance with JMX - init_messaging_service_handler(); } /* @@ -84,48 +83,64 @@ void gossiper::do_sort(std::vector& g_digest_list) { } } +future gossiper::handle_syn_msg(gossip_digest_syn syn_msg) { + this->set_last_processed_message_at(now_millis()); + inet_address from; + if (!this->is_enabled()) { + return make_ready_future(gossip_digest_ack()); + } + + /* If the message is from a different cluster throw it away. */ + if (syn_msg.cluster_id() != get_cluster_name()) { + return make_ready_future(gossip_digest_ack()); + } + + if (syn_msg.partioner() != "" && syn_msg.partioner() != get_partitioner_name()) { + return make_ready_future(gossip_digest_ack()); + } + + auto g_digest_list = syn_msg.get_gossip_digests(); + do_sort(g_digest_list); + std::vector delta_gossip_digest_list; + std::map delta_ep_state_map; + this->examine_gossiper(g_digest_list, delta_gossip_digest_list, delta_ep_state_map); + gms::gossip_digest_ack ack_msg(std::move(delta_gossip_digest_list), std::move(delta_ep_state_map)); + return make_ready_future(std::move(ack_msg)); +} + void gossiper::init_messaging_service_handler() { - ms().register_handler(messaging_verb::ECHO, [this] (empty_msg msg) { + ms().register_handler(messaging_verb::ECHO, [] (empty_msg msg) { // TODO: Use time_point instead of long for timing. - this->set_last_processed_message_at(now_millis()); - return make_ready_future(); + return smp::submit_to(0, [msg = std::move(msg)] () mutable { + auto& gossiper = gms::get_local_gossiper(); + gossiper.set_last_processed_message_at(now_millis()); + return make_ready_future(); + }); }); - ms().register_handler_oneway(messaging_verb::GOSSIP_SHUTDOWN, [this] (inet_address from) { - this->set_last_processed_message_at(now_millis()); - // TODO: Implement processing of incoming SHUTDOWN message - get_local_failure_detector().force_conviction(from); + ms().register_handler_oneway(messaging_verb::GOSSIP_SHUTDOWN, [] (inet_address from) { + smp::submit_to(0, [from] { + auto& gossiper = gms::get_local_gossiper(); + gossiper.set_last_processed_message_at(now_millis()); + // TODO: Implement processing of incoming SHUTDOWN message + get_local_failure_detector().force_conviction(from); + }).discard_result(); return messaging_service::no_wait(); }); - ms().register_handler(messaging_verb::GOSSIP_DIGEST_SYN, [this] (gossip_digest_syn syn_msg) { - this->set_last_processed_message_at(now_millis()); - inet_address from; - if (!this->is_enabled()) { - return make_ready_future(gossip_digest_ack()); - } - - /* If the message is from a different cluster throw it away. */ - if (syn_msg.cluster_id() != get_cluster_name()) { - return make_ready_future(gossip_digest_ack()); - } - - if (syn_msg.partioner() != "" && syn_msg.partioner() != get_partitioner_name()) { - return make_ready_future(gossip_digest_ack()); - } - - auto g_digest_list = syn_msg.get_gossip_digests(); - do_sort(g_digest_list); - std::vector delta_gossip_digest_list; - std::map delta_ep_state_map; - this->examine_gossiper(g_digest_list, delta_gossip_digest_list, delta_ep_state_map); - gms::gossip_digest_ack ack_msg(std::move(delta_gossip_digest_list), std::move(delta_ep_state_map)); - return make_ready_future(std::move(ack_msg)); + ms().register_handler(messaging_verb::GOSSIP_DIGEST_SYN, [] (gossip_digest_syn syn_msg) { + return smp::submit_to(0, [syn_msg = std::move(syn_msg)] () mutable { + auto& gossiper = gms::get_local_gossiper(); + return gossiper.handle_syn_msg(std::move(syn_msg)); + }); }); - ms().register_handler_oneway(messaging_verb::GOSSIP_DIGEST_ACK2, [this] (gossip_digest_ack2 msg) { - this->set_last_processed_message_at(now_millis()); - auto& remote_ep_state_map = msg.get_endpoint_state_map(); - /* Notify the Failure Detector */ - this->notify_failure_detector(remote_ep_state_map); - this->apply_state_locally(remote_ep_state_map); + ms().register_handler_oneway(messaging_verb::GOSSIP_DIGEST_ACK2, [] (gossip_digest_ack2 msg) { + smp::submit_to(0, [msg = std::move(msg)] () mutable { + auto& gossiper = gms::get_local_gossiper(); + gossiper.set_last_processed_message_at(now_millis()); + auto& remote_ep_state_map = msg.get_endpoint_state_map(); + /* Notify the Failure Detector */ + gossiper.notify_failure_detector(remote_ep_state_map); + gossiper.apply_state_locally(remote_ep_state_map); + }).discard_result(); return messaging_service::no_wait(); }); } @@ -1024,27 +1039,36 @@ void gossiper::examine_gossiper(std::vector& g_digest_list, } } -void gossiper::start(int generation_number) { - start(generation_number, std::map()); +future<> gossiper::start(int generation_number) { + return start(generation_number, std::map()); } -void gossiper::start(int generation_nbr, std::map preload_local_states) { - build_seeds_list(); - /* initialize the heartbeat state for this localEndpoint */ - maybe_initialize_local_state(generation_nbr); - endpoint_state& local_state = endpoint_state_map[get_broadcast_address()]; - for (auto& entry : preload_local_states) { - local_state.add_application_state(entry.first, entry.second); - } +future<> gossiper::start(int generation_nbr, std::map preload_local_states) { + // Although gossiper runs on cpu0 only, we need to listen incoming gossip + // message on all cpus and forard them to cpu0 to process. + return _handlers.start().then([this] { + return _handlers.invoke_on_all([this] (handler& h) { + this->init_messaging_service_handler(); + }); + }).then([this, generation_nbr, preload_local_states] { + build_seeds_list(); + /* initialize the heartbeat state for this localEndpoint */ + maybe_initialize_local_state(generation_nbr); + endpoint_state& local_state = endpoint_state_map[get_broadcast_address()]; + for (auto& entry : preload_local_states) { + local_state.add_application_state(entry.first, entry.second); + } - //notify snitches that Gossiper is about to start + //notify snitches that Gossiper is about to start #if 0 - DatabaseDescriptor.getEndpointSnitch().gossiperStarting(); - if (logger.isTraceEnabled()) - logger.trace("gossip started with generation {}", local_state.get_heart_beat_state().get_generation()); + DatabaseDescriptor.getEndpointSnitch().gossiperStarting(); + if (logger.isTraceEnabled()) + logger.trace("gossip started with generation {}", local_state.get_heart_beat_state().get_generation()); #endif - std::chrono::milliseconds period(INTERVAL_IN_MILLIS); - _scheduled_gossip_task.arm_periodic(period); + std::chrono::milliseconds period(INTERVAL_IN_MILLIS); + _scheduled_gossip_task.arm_periodic(period); + return make_ready_future<>(); + }); } void gossiper::do_shadow_round() { diff --git a/gms/gossiper.hh b/gms/gossiper.hh index 7bcd434631..5678b8021d 100644 --- a/gms/gossiper.hh +++ b/gms/gossiper.hh @@ -80,7 +80,10 @@ private: net::messaging_service& ms() { return net::get_local_messaging_service(); } + class handler {}; + distributed _handlers; void init_messaging_service_handler(); + future handle_syn_msg(gossip_digest_syn syn_msg); static constexpr const uint32_t _default_cpuid = 0; shard_id get_shard_id(inet_address to) { return shard_id{to, _default_cpuid}; @@ -396,12 +399,12 @@ public: std::map& delta_ep_state_map); public: - void start(int generation_number); + future<> start(int generation_number); /** * Start the gossiper with the generation number, preloading the map of application states before starting */ - void start(int generation_nbr, std::map preload_local_states); + future<> start(int generation_nbr, std::map preload_local_states); public: /** diff --git a/tests/urchin/gossip.cc b/tests/urchin/gossip.cc index 2d6331649a..32ee032716 100644 --- a/tests/urchin/gossip.cc +++ b/tests/urchin/gossip.cc @@ -37,27 +37,27 @@ int main(int ac, char ** av) { }; int generation_number = 1; - gossiper.start(generation_number, app_states); + gossiper.start(generation_number, app_states).then([] () { + auto reporter = std::make_shared>(); + reporter->set_callback ([reporter] { + auto& gossiper = gms::get_local_gossiper(); + gossiper.dump_endpoint_state_map(); + auto& fd = gms::get_local_failure_detector(); + print("%s", fd); + }); + reporter->arm_periodic(std::chrono::milliseconds(1000)); - auto reporter = std::make_shared>(); - reporter->set_callback ([reporter] { - auto& gossiper = gms::get_local_gossiper(); - gossiper.dump_endpoint_state_map(); - auto& fd = gms::get_local_failure_detector(); - print("%s", fd); + auto app_state_adder = std::make_shared>(); + app_state_adder->set_callback ([app_state_adder] { + static double load = 0.5; + auto& gossiper = gms::get_local_gossiper(); + auto state = gms::application_state::LOAD; + auto value = gms::versioned_value::versioned_value_factory::load(load); + gossiper.add_local_application_state(state, value); + load += 0.0001; + }); + app_state_adder->arm_periodic(std::chrono::seconds(1)); }); - reporter->arm_periodic(std::chrono::milliseconds(1000)); - - auto app_state_adder = std::make_shared>(); - app_state_adder->set_callback ([app_state_adder] { - static double load = 0.5; - auto& gossiper = gms::get_local_gossiper(); - auto state = gms::application_state::LOAD; - auto value = gms::versioned_value::versioned_value_factory::load(load); - gossiper.add_local_application_state(state, value); - load += 0.0001; - }); - app_state_adder->arm_periodic(std::chrono::seconds(1)); }); }); });