gossip: Implement the missing sleep logic

This commit is contained in:
Asias He
2015-07-29 11:13:35 +08:00
parent 1a6f7cf2aa
commit d799d1aa8f
2 changed files with 57 additions and 55 deletions

View File

@@ -659,43 +659,43 @@ void gossiper::make_random_gossip_digest(std::vector<gossip_digest>& g_digests)
#endif
}
void gossiper::advertise_removing(inet_address endpoint, utils::UUID host_id, utils::UUID local_host_id) {
auto& state = endpoint_state_map.at(endpoint);
// remember this node's generation
int generation = state.get_heart_beat_state().get_generation();
logger.info("Removing host: {}", host_id);
logger.info("Sleeping for {}ms to ensure {} does not change", service::storage_service::RING_DELAY, endpoint);
// FIXME: sleep
warn(unimplemented::cause::GOSSIP);
// Uninterruptibles.sleepUninterruptibly(StorageService.RING_DELAY, TimeUnit.MILLISECONDS);
// make sure it did not change
auto& eps = endpoint_state_map.at(endpoint);
if (eps.get_heart_beat_state().get_generation() != generation) {
throw std::runtime_error(sprint("Endpoint %s generation changed while trying to remove it", endpoint));
}
future<> gossiper::advertise_removing(inet_address endpoint, utils::UUID host_id, utils::UUID local_host_id) {
return seastar::async([this, endpoint, host_id, local_host_id] {
auto& state = endpoint_state_map.at(endpoint);
// remember this node's generation
int generation = state.get_heart_beat_state().get_generation();
logger.info("Removing host: {}", host_id);
logger.info("Sleeping for {}ms to ensure {} does not change", service::storage_service::RING_DELAY, endpoint);
sleep(storage_service_ring_delay()).get();
// make sure it did not change
auto& eps = endpoint_state_map.at(endpoint);
if (eps.get_heart_beat_state().get_generation() != generation) {
throw std::runtime_error(sprint("Endpoint %s generation changed while trying to remove it", endpoint));
}
// update the other node's generation to mimic it as if it had changed it itself
logger.info("Advertising removal for {}", endpoint);
eps.update_timestamp(); // make sure we don't evict it too soon
eps.get_heart_beat_state().force_newer_generation_unsafe();
eps.add_application_state(application_state::STATUS, storage_service_value_factory().removing_nonlocal(host_id));
eps.add_application_state(application_state::REMOVAL_COORDINATOR, storage_service_value_factory().removal_coordinator(local_host_id));
endpoint_state_map[endpoint] = eps;
// update the other node's generation to mimic it as if it had changed it itself
logger.info("Advertising removal for {}", endpoint);
eps.update_timestamp(); // make sure we don't evict it too soon
eps.get_heart_beat_state().force_newer_generation_unsafe();
eps.add_application_state(application_state::STATUS, storage_service_value_factory().removing_nonlocal(host_id));
eps.add_application_state(application_state::REMOVAL_COORDINATOR, storage_service_value_factory().removal_coordinator(local_host_id));
endpoint_state_map[endpoint] = eps;
});
}
void gossiper::advertise_token_removed(inet_address endpoint, utils::UUID host_id) {
auto& eps = endpoint_state_map.at(endpoint);
eps.update_timestamp(); // make sure we don't evict it too soon
eps.get_heart_beat_state().force_newer_generation_unsafe();
auto expire_time = compute_expire_time();
eps.add_application_state(application_state::STATUS, storage_service_value_factory().removed_nonlocal(host_id, expire_time.time_since_epoch().count()));
logger.info("Completing removal of {}", endpoint);
add_expire_time_for_endpoint(endpoint, expire_time);
endpoint_state_map[endpoint] = eps;
// ensure at least one gossip round occurs before returning
// FIXME: sleep
//Uninterruptibles.sleepUninterruptibly(INTERVAL * 2, TimeUnit.MILLISECONDS);
warn(unimplemented::cause::GOSSIP);
future<> gossiper::advertise_token_removed(inet_address endpoint, utils::UUID host_id) {
return seastar::async([this, endpoint, host_id] {
auto& eps = endpoint_state_map.at(endpoint);
eps.update_timestamp(); // make sure we don't evict it too soon
eps.get_heart_beat_state().force_newer_generation_unsafe();
auto expire_time = compute_expire_time();
eps.add_application_state(application_state::STATUS, storage_service_value_factory().removed_nonlocal(host_id, expire_time.time_since_epoch().count()));
logger.info("Completing removal of {}", endpoint);
add_expire_time_for_endpoint(endpoint, expire_time);
endpoint_state_map[endpoint] = eps;
// ensure at least one gossip round occurs before returning
sleep(INTERVAL * 2).get();
});
}
future<> gossiper::unsafe_assassinate_endpoint(sstring address) {
@@ -1265,24 +1265,26 @@ void gossiper::add_lccal_application_states(std::list<std::pair<application_stat
}
}
void gossiper::shutdown() {
warn(unimplemented::cause::GOSSIP);
// if (scheduledGossipTask != null)
// scheduledGossipTask.cancel(false);
logger.info("Announcing shutdown");
// Uninterruptibles.sleepUninterruptibly(INTERVAL * 2, TimeUnit.MILLISECONDS);
for (inet_address addr : _live_endpoints) {
shard_id id = get_shard_id(addr);
logger.trace("Sending a GossipShutdown to {}", id);
ms().send_gossip_shutdown(id, addr).then_wrapped([id] (auto&&f) {
try {
f.get();
logger.trace("Got GossipShutdown Reply");
} catch (...) {
logger.error("Fail to send GossipShutdown to {}: {}", id, std::current_exception());
}
});
}
future<> gossiper::shutdown() {
return seastar::async([this] {
warn(unimplemented::cause::GOSSIP);
// if (scheduledGossipTask != null)
// scheduledGossipTask.cancel(false);
logger.info("Announcing shutdown");
sleep(INTERVAL * 2).get();
for (inet_address addr : _live_endpoints) {
shard_id id = get_shard_id(addr);
logger.trace("Sending a GossipShutdown to {}", id);
ms().send_gossip_shutdown(id, addr).then_wrapped([id] (auto&&f) {
try {
f.get();
logger.trace("Got GossipShutdown Reply");
} catch (...) {
logger.error("Fail to send GossipShutdown to {}: {}", id, std::current_exception());
}
});
}
});
}
future<> gossiper::stop() {

View File

@@ -267,7 +267,7 @@ public:
* @param host_id - the ID of the host being removed
* @param local_host_id - my own host ID for replication coordination
*/
void advertise_removing(inet_address endpoint, utils::UUID host_id, utils::UUID local_host_id);
future<> advertise_removing(inet_address endpoint, utils::UUID host_id, utils::UUID local_host_id);
/**
* Handles switching the endpoint's state from REMOVING_TOKEN to REMOVED_TOKEN
@@ -276,7 +276,7 @@ public:
* @param endpoint
* @param host_id
*/
void advertise_token_removed(inet_address endpoint, utils::UUID host_id);
future<> advertise_token_removed(inet_address endpoint, utils::UUID host_id);
future<> unsafe_assassinate_endpoint(sstring address);
@@ -421,7 +421,7 @@ public:
void add_lccal_application_states(std::list<std::pair<application_state, versioned_value>> states);
void shutdown();
future<> shutdown();
future<> stop();