mirror of
https://github.com/scylladb/scylladb.git
synced 2026-05-31 20:16:43 +00:00
gossip: Refactor waiting for features
This patch changes the sleep-based mechanism of detecting new features by instead registering waiters with a condition variable that is signaled whenever a new endpoint information is received. Signed-off-by: Duarte Nunes <duarte@scylladb.com>
This commit is contained in:
@@ -57,6 +57,7 @@
|
||||
#include <seastar/core/thread.hh>
|
||||
#include <chrono>
|
||||
#include "dht/i_partitioner.hh"
|
||||
#include <boost/range/algorithm/set_algorithm.hpp>
|
||||
|
||||
namespace gms {
|
||||
|
||||
@@ -596,6 +597,7 @@ void gossiper::run() {
|
||||
if (endpoint_map_changed || live_endpoint_changed || unreachable_endpoint_changed) {
|
||||
if (endpoint_map_changed) {
|
||||
shadow_endpoint_state_map = endpoint_state_map;
|
||||
_features_condvar.broadcast();
|
||||
}
|
||||
|
||||
if (live_endpoint_changed) {
|
||||
@@ -612,6 +614,7 @@ void gossiper::run() {
|
||||
if (engine().cpu_id() != 0) {
|
||||
if (endpoint_map_changed) {
|
||||
local_gossiper.endpoint_state_map = shadow_endpoint_state_map;
|
||||
local_gossiper._features_condvar.broadcast();
|
||||
}
|
||||
|
||||
if (live_endpoint_changed) {
|
||||
@@ -1524,6 +1527,7 @@ future<> gossiper::do_stop_gossiping() {
|
||||
get_local_failure_detector().unregister_failure_detection_event_listener(&g);
|
||||
}
|
||||
g.uninit_messaging_service_handler();
|
||||
g._features_condvar.broken();
|
||||
return make_ready_future<>();
|
||||
}).get();
|
||||
});
|
||||
@@ -1744,24 +1748,19 @@ std::set<sstring> gossiper::get_supported_features() const {
|
||||
return common_features;
|
||||
}
|
||||
|
||||
static future<stop_iteration> check_features(auto features, auto need_features) {
|
||||
static bool check_features(std::set<sstring> features, std::set<sstring> need_features) {
|
||||
logger.info("Checking if need_features {} in features {}", need_features, features);
|
||||
if (std::includes(features.begin(), features.end(), need_features.begin(), need_features.end())) {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
}
|
||||
return sleep(std::chrono::seconds(2)).then([] {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||
});
|
||||
return boost::range::includes(features, need_features);
|
||||
}
|
||||
|
||||
future<> gossiper::wait_for_feature_on_all_node(std::set<sstring> features) const {
|
||||
return repeat([this, features] {
|
||||
future<> gossiper::wait_for_feature_on_all_node(std::set<sstring> features) {
|
||||
return _features_condvar.wait([this, features = std::move(features)] {
|
||||
return check_features(get_supported_features(), features);
|
||||
});
|
||||
}
|
||||
|
||||
future<> gossiper::wait_for_feature_on_node(std::set<sstring> features, inet_address endpoint) const {
|
||||
return repeat([this, features, endpoint] {
|
||||
future<> gossiper::wait_for_feature_on_node(std::set<sstring> features, inet_address endpoint) {
|
||||
return _features_condvar.wait([this, features = std::move(features), endpoint = std::move(endpoint)] {
|
||||
return check_features(get_supported_features(endpoint), features);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -54,6 +54,7 @@
|
||||
#include <algorithm>
|
||||
#include <chrono>
|
||||
#include <set>
|
||||
#include <seastar/core/condition-variable.hh>
|
||||
|
||||
namespace gms {
|
||||
|
||||
@@ -514,15 +515,17 @@ private:
|
||||
uint64_t _nr_run = 0;
|
||||
bool _ms_registered = false;
|
||||
bool _gossiped_to_seed = false;
|
||||
private:
|
||||
condition_variable _features_condvar;
|
||||
public:
|
||||
// Get features supported by a particular node
|
||||
std::set<sstring> get_supported_features(inet_address endpoint) const;
|
||||
// Get features supported by all the nodes this node knows about
|
||||
std::set<sstring> get_supported_features() const;
|
||||
// Wait for features are available on all nodes this node knows about
|
||||
future<> wait_for_feature_on_all_node(std::set<sstring> features) const;
|
||||
future<> wait_for_feature_on_all_node(std::set<sstring> features);
|
||||
// Wait for features are available on a particular node
|
||||
future<> wait_for_feature_on_node(std::set<sstring> features, inet_address endpoint) const;
|
||||
future<> wait_for_feature_on_node(std::set<sstring> features, inet_address endpoint);
|
||||
};
|
||||
|
||||
extern distributed<gossiper> _the_gossiper;
|
||||
|
||||
Reference in New Issue
Block a user