gossip: Make send_gossip and friends return future

This commit is contained in:
Asias He
2015-08-06 17:50:43 +08:00
parent 3b064c528e
commit c6509dad42
2 changed files with 20 additions and 18 deletions

View File

@@ -228,11 +228,11 @@ void gossiper::init_messaging_service_handler() {
});
}
bool gossiper::send_gossip(gossip_digest_syn message, std::set<inet_address> epset) {
future<bool> gossiper::send_gossip(gossip_digest_syn message, std::set<inet_address> epset) {
std::vector<inet_address> __live_endpoints(epset.begin(), epset.end());
size_t size = __live_endpoints.size();
if (size < 1) {
return false;
return make_ready_future<bool>(false);
}
/* Generate a random number from 0 -> size */
std::uniform_int_distribution<int> dist(0, size - 1);
@@ -253,7 +253,7 @@ bool gossiper::send_gossip(gossip_digest_syn message, std::set<inet_address> eps
}
});
return _seeds.count(to);
return make_ready_future<bool>(_seeds.count(to));
}
@@ -439,10 +439,10 @@ void gossiper::run() {
gossip_digest_syn message(get_cluster_name(), get_partitioner_name(), g_digests);
/* Gossip to some random live member */
bool gossiped_to_seed = do_gossip_to_live_member(message);
bool gossiped_to_seed = std::get<0>(do_gossip_to_live_member(message).get());
/* Gossip to some unreachable member with some probability to check if he is back up */
do_gossip_to_unreachable_member(message);
do_gossip_to_unreachable_member(message).get();
/* Gossip to a seed if we did not do so above, or we have seen less nodes
than there are seeds. This prevents partitions where each group of nodes
@@ -461,7 +461,7 @@ void gossiper::run() {
See CASSANDRA-150 for more exposition. */
if (!gossiped_to_seed || _live_endpoints.size() < _seeds.size()) {
do_gossip_to_seed(message);
do_gossip_to_seed(message).get();
}
do_status_check();
@@ -766,16 +766,16 @@ int gossiper::get_current_generation_number(inet_address endpoint) {
return endpoint_state_map.at(endpoint).get_heart_beat_state().get_generation();
}
bool gossiper::do_gossip_to_live_member(gossip_digest_syn message) {
future<bool> gossiper::do_gossip_to_live_member(gossip_digest_syn message) {
size_t size = _live_endpoints.size();
if (size == 0) {
return false;
return make_ready_future<bool>(false);
}
logger.trace("do_gossip_to_live_member: live_endpoint nr={}", _live_endpoints.size());
return send_gossip(message, _live_endpoints);
}
void gossiper::do_gossip_to_unreachable_member(gossip_digest_syn message) {
future<> gossiper::do_gossip_to_unreachable_member(gossip_digest_syn message) {
double live_endpoint_count = _live_endpoints.size();
double unreachable_endpoint_count = _unreachable_endpoints.size();
if (unreachable_endpoint_count > 0) {
@@ -790,21 +790,22 @@ void gossiper::do_gossip_to_unreachable_member(gossip_digest_syn message) {
}
logger.trace("do_gossip_to_unreachable_member: live_endpoint nr={} unreachable_endpoints nr={}",
live_endpoint_count, unreachable_endpoint_count);
send_gossip(message, addrs);
return send_gossip(message, addrs).discard_result();
}
}
return make_ready_future<>();
}
void gossiper::do_gossip_to_seed(gossip_digest_syn prod) {
future<> gossiper::do_gossip_to_seed(gossip_digest_syn prod) {
size_t size = _seeds.size();
if (size > 0) {
if (size == 1 && _seeds.count(get_broadcast_address())) {
return;
return make_ready_future<>();
}
if (_live_endpoints.size() == 0) {
logger.trace("do_gossip_to_seed: live_endpoints nr={}, seeds nr={}", 0, _seeds.size());
send_gossip(prod, _seeds);
return send_gossip(prod, _seeds).discard_result();
} else {
/* Gossip with the seed with some probability. */
double probability = _seeds.size() / (double) (_live_endpoints.size() + _unreachable_endpoints.size());
@@ -812,10 +813,11 @@ void gossiper::do_gossip_to_seed(gossip_digest_syn prod) {
double rand_dbl = dist(_random);
if (rand_dbl <= probability) {
logger.trace("do_gossip_to_seed: live_endpoints nr={}, seeds nr={}", _live_endpoints.size(), _seeds.size());
send_gossip(prod, _seeds);
return send_gossip(prod, _seeds).discard_result();
}
}
}
return make_ready_future<>();
}
bool gossiper::is_gossip_only_member(inet_address endpoint) {

View File

@@ -306,16 +306,16 @@ private:
* @param epSet a set of endpoint from which a random endpoint is chosen.
* @return true if the chosen endpoint is also a seed.
*/
bool send_gossip(gossip_digest_syn message, std::set<inet_address> epset);
future<bool> send_gossip(gossip_digest_syn message, std::set<inet_address> epset);
/* Sends a Gossip message to a live member and returns true if the recipient was a seed */
bool do_gossip_to_live_member(gossip_digest_syn message);
future<bool> do_gossip_to_live_member(gossip_digest_syn message);
/* Sends a Gossip message to an unreachable member */
void do_gossip_to_unreachable_member(gossip_digest_syn message);
future<> do_gossip_to_unreachable_member(gossip_digest_syn message);
/* Gossip to a seed for facilitating partition healing */
void do_gossip_to_seed(gossip_digest_syn prod);
future<> do_gossip_to_seed(gossip_digest_syn prod);
void do_status_check();