From 4684b8ecbbb1e4409fb5ea7c7afc9ea1dccbb020 Mon Sep 17 00:00:00 2001 From: Duarte Nunes Date: Wed, 25 May 2016 19:34:51 +0000 Subject: [PATCH] gossip: Refactor waiting for features This patch changes the sleep-based mechanism of detecting new features by instead registering waiters with a condition variable that is signaled whenever a new endpoint information is received. Signed-off-by: Duarte Nunes --- gms/gossiper.cc | 21 ++++++++++----------- gms/gossiper.hh | 7 +++++-- 2 files changed, 15 insertions(+), 13 deletions(-) diff --git a/gms/gossiper.cc b/gms/gossiper.cc index 9f13e24412..0ff3b770a6 100644 --- a/gms/gossiper.cc +++ b/gms/gossiper.cc @@ -57,6 +57,7 @@ #include #include #include "dht/i_partitioner.hh" +#include namespace gms { @@ -596,6 +597,7 @@ void gossiper::run() { if (endpoint_map_changed || live_endpoint_changed || unreachable_endpoint_changed) { if (endpoint_map_changed) { shadow_endpoint_state_map = endpoint_state_map; + _features_condvar.broadcast(); } if (live_endpoint_changed) { @@ -612,6 +614,7 @@ void gossiper::run() { if (engine().cpu_id() != 0) { if (endpoint_map_changed) { local_gossiper.endpoint_state_map = shadow_endpoint_state_map; + local_gossiper._features_condvar.broadcast(); } if (live_endpoint_changed) { @@ -1524,6 +1527,7 @@ future<> gossiper::do_stop_gossiping() { get_local_failure_detector().unregister_failure_detection_event_listener(&g); } g.uninit_messaging_service_handler(); + g._features_condvar.broken(); return make_ready_future<>(); }).get(); }); @@ -1744,24 +1748,19 @@ std::set gossiper::get_supported_features() const { return common_features; } -static future check_features(auto features, auto need_features) { +static bool check_features(std::set features, std::set need_features) { logger.info("Checking if need_features {} in features {}", need_features, features); - if (std::includes(features.begin(), features.end(), need_features.begin(), need_features.end())) { - return make_ready_future(stop_iteration::yes); - } - return sleep(std::chrono::seconds(2)).then([] { - return make_ready_future(stop_iteration::no); - }); + return boost::range::includes(features, need_features); } -future<> gossiper::wait_for_feature_on_all_node(std::set features) const { - return repeat([this, features] { +future<> gossiper::wait_for_feature_on_all_node(std::set features) { + return _features_condvar.wait([this, features = std::move(features)] { return check_features(get_supported_features(), features); }); } -future<> gossiper::wait_for_feature_on_node(std::set features, inet_address endpoint) const { - return repeat([this, features, endpoint] { +future<> gossiper::wait_for_feature_on_node(std::set features, inet_address endpoint) { + return _features_condvar.wait([this, features = std::move(features), endpoint = std::move(endpoint)] { return check_features(get_supported_features(endpoint), features); }); } diff --git a/gms/gossiper.hh b/gms/gossiper.hh index a245a3b87c..42b5a04940 100644 --- a/gms/gossiper.hh +++ b/gms/gossiper.hh @@ -54,6 +54,7 @@ #include #include #include +#include namespace gms { @@ -514,15 +515,17 @@ private: uint64_t _nr_run = 0; bool _ms_registered = false; bool _gossiped_to_seed = false; +private: + condition_variable _features_condvar; public: // Get features supported by a particular node std::set get_supported_features(inet_address endpoint) const; // Get features supported by all the nodes this node knows about std::set get_supported_features() const; // Wait for features are available on all nodes this node knows about - future<> wait_for_feature_on_all_node(std::set features) const; + future<> wait_for_feature_on_all_node(std::set features); // Wait for features are available on a particular node - future<> wait_for_feature_on_node(std::set features, inet_address endpoint) const; + future<> wait_for_feature_on_node(std::set features, inet_address endpoint); }; extern distributed _the_gossiper;