From d799d1aa8ff06e6ded3d2ffa27aa22cea92de3b3 Mon Sep 17 00:00:00 2001 From: Asias He Date: Wed, 29 Jul 2015 11:13:35 +0800 Subject: [PATCH] gossip: Implement the missing sleep logic --- gms/gossiper.cc | 106 ++++++++++++++++++++++++------------------------ gms/gossiper.hh | 6 +-- 2 files changed, 57 insertions(+), 55 deletions(-) diff --git a/gms/gossiper.cc b/gms/gossiper.cc index 6d1e9c835f..a75f2aabbe 100644 --- a/gms/gossiper.cc +++ b/gms/gossiper.cc @@ -659,43 +659,43 @@ void gossiper::make_random_gossip_digest(std::vector& g_digests) #endif } -void gossiper::advertise_removing(inet_address endpoint, utils::UUID host_id, utils::UUID local_host_id) { - auto& state = endpoint_state_map.at(endpoint); - // remember this node's generation - int generation = state.get_heart_beat_state().get_generation(); - logger.info("Removing host: {}", host_id); - logger.info("Sleeping for {}ms to ensure {} does not change", service::storage_service::RING_DELAY, endpoint); - // FIXME: sleep - warn(unimplemented::cause::GOSSIP); - // Uninterruptibles.sleepUninterruptibly(StorageService.RING_DELAY, TimeUnit.MILLISECONDS); - // make sure it did not change - auto& eps = endpoint_state_map.at(endpoint); - if (eps.get_heart_beat_state().get_generation() != generation) { - throw std::runtime_error(sprint("Endpoint %s generation changed while trying to remove it", endpoint)); - } +future<> gossiper::advertise_removing(inet_address endpoint, utils::UUID host_id, utils::UUID local_host_id) { + return seastar::async([this, endpoint, host_id, local_host_id] { + auto& state = endpoint_state_map.at(endpoint); + // remember this node's generation + int generation = state.get_heart_beat_state().get_generation(); + logger.info("Removing host: {}", host_id); + logger.info("Sleeping for {}ms to ensure {} does not change", service::storage_service::RING_DELAY, endpoint); + sleep(storage_service_ring_delay()).get(); + // make sure it did not change + auto& eps = endpoint_state_map.at(endpoint); + if (eps.get_heart_beat_state().get_generation() != generation) { + throw std::runtime_error(sprint("Endpoint %s generation changed while trying to remove it", endpoint)); + } - // update the other node's generation to mimic it as if it had changed it itself - logger.info("Advertising removal for {}", endpoint); - eps.update_timestamp(); // make sure we don't evict it too soon - eps.get_heart_beat_state().force_newer_generation_unsafe(); - eps.add_application_state(application_state::STATUS, storage_service_value_factory().removing_nonlocal(host_id)); - eps.add_application_state(application_state::REMOVAL_COORDINATOR, storage_service_value_factory().removal_coordinator(local_host_id)); - endpoint_state_map[endpoint] = eps; + // update the other node's generation to mimic it as if it had changed it itself + logger.info("Advertising removal for {}", endpoint); + eps.update_timestamp(); // make sure we don't evict it too soon + eps.get_heart_beat_state().force_newer_generation_unsafe(); + eps.add_application_state(application_state::STATUS, storage_service_value_factory().removing_nonlocal(host_id)); + eps.add_application_state(application_state::REMOVAL_COORDINATOR, storage_service_value_factory().removal_coordinator(local_host_id)); + endpoint_state_map[endpoint] = eps; + }); } -void gossiper::advertise_token_removed(inet_address endpoint, utils::UUID host_id) { - auto& eps = endpoint_state_map.at(endpoint); - eps.update_timestamp(); // make sure we don't evict it too soon - eps.get_heart_beat_state().force_newer_generation_unsafe(); - auto expire_time = compute_expire_time(); - eps.add_application_state(application_state::STATUS, storage_service_value_factory().removed_nonlocal(host_id, expire_time.time_since_epoch().count())); - logger.info("Completing removal of {}", endpoint); - add_expire_time_for_endpoint(endpoint, expire_time); - endpoint_state_map[endpoint] = eps; - // ensure at least one gossip round occurs before returning - // FIXME: sleep - //Uninterruptibles.sleepUninterruptibly(INTERVAL * 2, TimeUnit.MILLISECONDS); - warn(unimplemented::cause::GOSSIP); +future<> gossiper::advertise_token_removed(inet_address endpoint, utils::UUID host_id) { + return seastar::async([this, endpoint, host_id] { + auto& eps = endpoint_state_map.at(endpoint); + eps.update_timestamp(); // make sure we don't evict it too soon + eps.get_heart_beat_state().force_newer_generation_unsafe(); + auto expire_time = compute_expire_time(); + eps.add_application_state(application_state::STATUS, storage_service_value_factory().removed_nonlocal(host_id, expire_time.time_since_epoch().count())); + logger.info("Completing removal of {}", endpoint); + add_expire_time_for_endpoint(endpoint, expire_time); + endpoint_state_map[endpoint] = eps; + // ensure at least one gossip round occurs before returning + sleep(INTERVAL * 2).get(); + }); } future<> gossiper::unsafe_assassinate_endpoint(sstring address) { @@ -1265,24 +1265,26 @@ void gossiper::add_lccal_application_states(std::list gossiper::shutdown() { + return seastar::async([this] { + warn(unimplemented::cause::GOSSIP); + // if (scheduledGossipTask != null) + // scheduledGossipTask.cancel(false); + logger.info("Announcing shutdown"); + sleep(INTERVAL * 2).get(); + for (inet_address addr : _live_endpoints) { + shard_id id = get_shard_id(addr); + logger.trace("Sending a GossipShutdown to {}", id); + ms().send_gossip_shutdown(id, addr).then_wrapped([id] (auto&&f) { + try { + f.get(); + logger.trace("Got GossipShutdown Reply"); + } catch (...) { + logger.error("Fail to send GossipShutdown to {}: {}", id, std::current_exception()); + } + }); + } + }); } future<> gossiper::stop() { diff --git a/gms/gossiper.hh b/gms/gossiper.hh index fda3abdeb2..bb0a7dbbb8 100644 --- a/gms/gossiper.hh +++ b/gms/gossiper.hh @@ -267,7 +267,7 @@ public: * @param host_id - the ID of the host being removed * @param local_host_id - my own host ID for replication coordination */ - void advertise_removing(inet_address endpoint, utils::UUID host_id, utils::UUID local_host_id); + future<> advertise_removing(inet_address endpoint, utils::UUID host_id, utils::UUID local_host_id); /** * Handles switching the endpoint's state from REMOVING_TOKEN to REMOVED_TOKEN @@ -276,7 +276,7 @@ public: * @param endpoint * @param host_id */ - void advertise_token_removed(inet_address endpoint, utils::UUID host_id); + future<> advertise_token_removed(inet_address endpoint, utils::UUID host_id); future<> unsafe_assassinate_endpoint(sstring address); @@ -421,7 +421,7 @@ public: void add_lccal_application_states(std::list> states); - void shutdown(); + future<> shutdown(); future<> stop();