From 0ce3d89a8538af3fbf541bfb3b24cbc3faaa58db Mon Sep 17 00:00:00 2001 From: Asias He Date: Tue, 28 Jul 2015 19:56:27 +0800 Subject: [PATCH] gossip: Switch to use chrono for time operation This is a long-awaited cleanup. Gossiper code runs every second, it is not performance sensitive, so it does not make much sense to stick to lowres db_clock, use high_resolution_clock instead. --- gms/endpoint_state.hh | 14 ++++--- gms/gossiper.cc | 89 ++++++++++++++++++++++--------------------- gms/gossiper.hh | 40 +++++++++---------- 3 files changed, 73 insertions(+), 70 deletions(-) diff --git a/gms/endpoint_state.hh b/gms/endpoint_state.hh index 0e5996e4c4..c0ba960a87 100644 --- a/gms/endpoint_state.hh +++ b/gms/endpoint_state.hh @@ -26,8 +26,8 @@ #include "gms/heart_beat_state.hh" #include "gms/application_state.hh" #include "gms/versioned_value.hh" -#include "db_clock.hh" #include +#include namespace gms { @@ -36,11 +36,13 @@ namespace gms { * instance. Any state for a given endpoint can be retrieved from this instance. */ class endpoint_state { +public: + using clk = std::chrono::high_resolution_clock; private: heart_beat_state _heart_beat_state; std::map _application_state; /* fields below do not get serialized */ - db_clock::time_point _update_timestamp; + clk::time_point _update_timestamp; bool _is_alive; public: bool operator==(const endpoint_state& other) const { @@ -52,13 +54,13 @@ public: endpoint_state() : _heart_beat_state(0) - , _update_timestamp(db_clock::now()) + , _update_timestamp(clk::now()) , _is_alive(true) { } endpoint_state(heart_beat_state initial_hb_state) : _heart_beat_state(initial_hb_state) - , _update_timestamp(db_clock::now()) + , _update_timestamp(clk::now()) , _is_alive(true) { } @@ -96,12 +98,12 @@ public: /** * @return System.nanoTime() when state was updated last time. */ - db_clock::time_point get_update_timestamp() { + clk::time_point get_update_timestamp() { return _update_timestamp; } void update_timestamp() { - _update_timestamp = db_clock::now(); + _update_timestamp = clk::now(); } bool is_alive() { diff --git a/gms/gossiper.cc b/gms/gossiper.cc index 8b41eb089c..5fac08a10e 100644 --- a/gms/gossiper.cc +++ b/gms/gossiper.cc @@ -34,17 +34,20 @@ #include "gms/i_failure_detector.hh" #include "service/storage_service.hh" #include "log.hh" +#include namespace gms { +using clk = gossiper::clk; + logging::logger logger("gossip"); -constexpr int gossiper::INTERVAL_IN_MILLIS; -constexpr int64_t gossiper::A_VERY_LONG_TIME; +constexpr std::chrono::milliseconds gossiper::INTERVAL; +constexpr std::chrono::hours gossiper::A_VERY_LONG_TIME; constexpr int64_t gossiper::MAX_GENERATION_DIFFERENCE; -int gossiper::quarantine_delay() { - return service::storage_service::RING_DELAY * 2; +std::chrono::milliseconds gossiper::quarantine_delay() { + return std::chrono::milliseconds(service::storage_service::RING_DELAY * 2); } // FIXME: StorageService.instance.valueFactory @@ -61,7 +64,7 @@ gossiper::gossiper() { _scheduled_gossip_task.set_callback([this] { run(); }); // half of QUARATINE_DELAY, to ensure _just_removed_endpoints has enough leeway to prevent re-gossip - fat_client_timeout = (int64_t) (quarantine_delay() / 2); + fat_client_timeout = quarantine_delay() / 2; /* register with the Failure Detector for receiving Failure detector events */ get_local_failure_detector().register_failure_detection_event_listener(this); // Register this instance with JMX @@ -107,7 +110,7 @@ void gossiper::do_sort(std::vector& g_digest_list) { } future gossiper::handle_syn_msg(gossip_digest_syn syn_msg) { - this->set_last_processed_message_at(now_millis()); + this->set_last_processed_message_at(now()); inet_address from; if (!this->is_enabled()) { return make_ready_future(gossip_digest_ack()); @@ -136,14 +139,14 @@ void gossiper::init_messaging_service_handler() { // TODO: Use time_point instead of long for timing. return smp::submit_to(0, [] { auto& gossiper = gms::get_local_gossiper(); - gossiper.set_last_processed_message_at(now_millis()); + gossiper.set_last_processed_message_at(now()); return make_ready_future<>(); }); }); ms().register_gossip_shutdown([] (inet_address from) { smp::submit_to(0, [from] { auto& gossiper = gms::get_local_gossiper(); - gossiper.set_last_processed_message_at(now_millis()); + gossiper.set_last_processed_message_at(now()); // TODO: Implement processing of incoming SHUTDOWN message get_local_failure_detector().force_conviction(from); }).discard_result(); @@ -158,7 +161,7 @@ void gossiper::init_messaging_service_handler() { ms().register_gossip_digest_ack2([] (gossip_digest_ack2 msg) { smp::submit_to(0, [msg = std::move(msg)] () mutable { auto& gossiper = gms::get_local_gossiper(); - gossiper.set_last_processed_message_at(now_millis()); + gossiper.set_last_processed_message_at(now()); auto& remote_ep_state_map = msg.get_endpoint_state_map(); /* Notify the Failure Detector */ gossiper.notify_failure_detector(remote_ep_state_map); @@ -184,7 +187,7 @@ bool gossiper::send_gossip(gossip_digest_syn message, std::set eps try { auto ack_msg = f.get0(); logger.trace("Got GossipDigestSyn Reply"); - this->set_last_processed_message_at(now_millis()); + this->set_last_processed_message_at(now()); if (!this->is_enabled() && !this->is_in_shadow_round()) { return; } @@ -351,7 +354,7 @@ void gossiper::remove_endpoint(inet_address endpoint) { void gossiper::do_status_check() { logger.trace("Performing status check ..."); - int64_t now = now_millis(); + auto now = this->now(); for (auto& entry : endpoint_state_map) { const inet_address& endpoint = entry.first; @@ -368,17 +371,17 @@ void gossiper::do_status_check() { // gossip after FatClientTimeout. Do not remove dead states here. if (is_gossip_only_member(endpoint) && !_just_removed_endpoints.count(endpoint) - && ((now - ep_state.get_update_timestamp().time_since_epoch().count()) > fat_client_timeout)) { - logger.info("FatClient {} has been silent for {}ms, removing from gossip", endpoint, fat_client_timeout); + && ((now - ep_state.get_update_timestamp()) > fat_client_timeout)) { + logger.info("FatClient {} has been silent for {}ms, removing from gossip", endpoint, fat_client_timeout.count()); remove_endpoint(endpoint); // will put it in _just_removed_endpoints to respect quarantine delay evict_from_membershipg(endpoint); // can get rid of the state immediately } // check for dead state removal - int64_t expire_time = get_expire_time_for_endpoint(endpoint); + auto expire_time = get_expire_time_for_endpoint(endpoint); if (!ep_state.is_alive() && (now > expire_time) && (!service::get_local_storage_service().get_token_metadata().is_member(endpoint))) { - logger.debug("time is expiring for endpoint : {} ({})", endpoint, expire_time); + logger.debug("time is expiring for endpoint : {} ({})", endpoint, expire_time.time_since_epoch().count()); evict_from_membershipg(endpoint); } } @@ -387,7 +390,7 @@ void gossiper::do_status_check() { for (auto it = _just_removed_endpoints.begin(); it != _just_removed_endpoints.end();) { auto& t= it->second; if ((now - t) > quarantine_delay()) { - logger.debug("{} elapsed, {} gossip quarantine over", quarantine_delay(), it->first); + logger.debug("{} ms elapsed, {} gossip quarantine over", quarantine_delay().count(), it->first); it = _just_removed_endpoints.erase(it); } else { it++; @@ -453,8 +456,6 @@ void gossiper::run() { // them across all other shards. // - Reschedule the gossiper only after execution on all nodes is done. // - std::chrono::milliseconds period(INTERVAL_IN_MILLIS); - bool endpoint_map_changed = (_shadow_endpoint_state_map != endpoint_state_map); bool live_endpoint_changed = @@ -483,9 +484,9 @@ void gossiper::run() { local_gossiper._live_endpoints = _shadow_live_endpoints; } } - }).then([this, period] { _scheduled_gossip_task.arm(period); }); + }).then([this] { _scheduled_gossip_task.arm(INTERVAL); }); } else { - _scheduled_gossip_task.arm(period); + _scheduled_gossip_task.arm(INTERVAL); } } @@ -543,11 +544,12 @@ std::set gossiper::get_unreachable_token_owners() { return token_owners; } +// Return downtime in microseconds int64_t gossiper::get_endpoint_downtime(inet_address ep) { auto it = _unreachable_endpoints.find(ep); if (it != _unreachable_endpoints.end()) { auto& downtime = it->second; - return (now_nanos() - downtime) / 1000; + return std::chrono::duration_cast(now() - downtime).count(); } else { return 0L; } @@ -593,17 +595,17 @@ void gossiper::evict_from_membershipg(inet_address endpoint) { } void gossiper::quarantine_endpoint(inet_address endpoint) { - quarantine_endpoint(endpoint, now_millis()); + quarantine_endpoint(endpoint, now()); } -void gossiper::quarantine_endpoint(inet_address endpoint, int64_t quarantine_expiration) { +void gossiper::quarantine_endpoint(inet_address endpoint, clk::time_point quarantine_expiration) { _just_removed_endpoints[endpoint] = quarantine_expiration; } void gossiper::replacement_quarantine(inet_address endpoint) { // remember, quarantine_endpoint will effectively already add QUARANTINE_DELAY, so this is 2x // logger.debug(""); - quarantine_endpoint(endpoint, now_millis() + quarantine_delay()); + quarantine_endpoint(endpoint, now() + quarantine_delay()); } void gossiper::replaced_endpoint(inet_address endpoint) { @@ -672,14 +674,14 @@ void gossiper::advertise_token_removed(inet_address endpoint, utils::UUID host_i auto& eps = endpoint_state_map.at(endpoint); eps.update_timestamp(); // make sure we don't evict it too soon eps.get_heart_beat_state().force_newer_generation_unsafe(); - int64_t expire_time = compute_expire_time(); - eps.add_application_state(application_state::STATUS, storage_service_value_factory().removed_nonlocal(host_id, expire_time)); + auto expire_time = compute_expire_time(); + eps.add_application_state(application_state::STATUS, storage_service_value_factory().removed_nonlocal(host_id, expire_time.time_since_epoch().count())); logger.info("Completing removal of {}", endpoint); add_expire_time_for_endpoint(endpoint, expire_time); endpoint_state_map[endpoint] = eps; // ensure at least one gossip round occurs before returning // FIXME: sleep - //Uninterruptibles.sleepUninterruptibly(INTERVAL_IN_MILLIS * 2, TimeUnit.MILLISECONDS); + //Uninterruptibles.sleepUninterruptibly(INTERVAL * 2, TimeUnit.MILLISECONDS); warn(unimplemented::cause::GOSSIP); } @@ -691,8 +693,10 @@ void gossiper::unsafe_assassinate_endpoint(sstring address) { void gossiper::assassinate_endpoint(sstring address) { inet_address endpoint(address); auto is_exist = endpoint_state_map.count(endpoint); + int gen = std::chrono::duration_cast((now() + std::chrono::seconds(60)).time_since_epoch()).count(); + int ver = 9999; endpoint_state&& ep_state = is_exist ? endpoint_state_map.at(endpoint) : - endpoint_state(heart_beat_state((int) ((now_millis() + 60000) / 1000), 9999)); + endpoint_state(heart_beat_state(gen, ver)); //Collection tokens = null; logger.warn("Assassinating {} via gossip", endpoint); if (is_exist) { @@ -733,7 +737,7 @@ void gossiper::assassinate_endpoint(sstring address) { // FIXME: StorageService.instance and Sleep // ep_state.add_application_state(application_state::STATUS, StorageService.instance.valueFactory.left(tokens, compute_expire_time())); handle_major_state_change(endpoint, ep_state); - // Uninterruptibles.sleepUninterruptibly(INTERVAL_IN_MILLIS * 4, TimeUnit.MILLISECONDS); + // Uninterruptibles.sleepUninterruptibly(INTERVAL * 4, TimeUnit.MILLISECONDS); logger.warn("Finished assassinating {}", endpoint); } @@ -807,13 +811,13 @@ bool gossiper::is_gossip_only_member(inet_address endpoint) { return !is_dead_state(eps) && !ss.get_token_metadata().is_member(endpoint); } -int64_t gossiper::get_expire_time_for_endpoint(inet_address endpoint) { +clk::time_point gossiper::get_expire_time_for_endpoint(inet_address endpoint) { /* default expire_time is A_VERY_LONG_TIME */ auto it = _expire_time_endpoint_map.find(endpoint); if (it == _expire_time_endpoint_map.end()) { return compute_expire_time(); } else { - int64_t stored_time = it->second; + auto stored_time = it->second; return stored_time; } } @@ -921,7 +925,7 @@ void gossiper::mark_alive(inet_address addr, endpoint_state local_state) { try { f.get(); logger.trace("Got EchoMessage Reply"); - this->set_last_processed_message_at(now_millis()); + this->set_last_processed_message_at(now()); this->real_mark_alive(id.addr, local_state); } catch (...) { logger.error("Fail to send EchoMessage to {}: {}", id, std::current_exception()); @@ -948,7 +952,7 @@ void gossiper::mark_dead(inet_address addr, endpoint_state& local_state) { logger.trace("marking as down {}", addr); local_state.mark_dead(); _live_endpoints.erase(addr); - _unreachable_endpoints[addr] = now_nanos(); + _unreachable_endpoints[addr] = now(); logger.info("inet_address {} is now DOWN", addr); for (auto& subscriber : _subscribers) { subscriber->on_dead(addr, local_state); @@ -1141,8 +1145,7 @@ future<> gossiper::start(int generation_nbr, std::map(); }); } @@ -1160,7 +1163,7 @@ void gossiper::do_shadow_round() { try { auto ack_msg = f.get0(); logger.trace("Got GossipDigestSyn Reply"); - this->set_last_processed_message_at(now_millis()); + this->set_last_processed_message_at(now()); if (this->is_in_shadow_round()) { this->finish_shadow_round(); } @@ -1223,7 +1226,7 @@ void gossiper::add_saved_endpoint(inet_address ep) { } ep_state.mark_dead(); endpoint_state_map[ep] = ep_state; - _unreachable_endpoints[ep] = now_nanos(); + _unreachable_endpoints[ep] = now(); logger.trace("Adding saved endpoint {} {}", ep, ep_state.get_heart_beat_state().get_generation()); } @@ -1256,7 +1259,7 @@ void gossiper::shutdown() { // if (scheduledGossipTask != null) // scheduledGossipTask.cancel(false); logger.info("Announcing shutdown"); - // Uninterruptibles.sleepUninterruptibly(INTERVAL_IN_MILLIS * 2, TimeUnit.MILLISECONDS); + // Uninterruptibles.sleepUninterruptibly(INTERVAL * 2, TimeUnit.MILLISECONDS); for (inet_address addr : _live_endpoints) { shard_id id = get_shard_id(addr); logger.trace("Sending a GossipShutdown to {}", id); @@ -1297,13 +1300,13 @@ bool gossiper::is_in_shadow_round() { return _in_shadow_round; } -void gossiper::add_expire_time_for_endpoint(inet_address endpoint, int64_t expire_time) { - logger.debug("adding expire time for endpoint : {} ({})", endpoint, expire_time); +void gossiper::add_expire_time_for_endpoint(inet_address endpoint, clk::time_point expire_time) { + logger.debug("adding expire time for endpoint : {} ({})", endpoint, expire_time.time_since_epoch().count()); _expire_time_endpoint_map[endpoint] = expire_time; } -int64_t gossiper::compute_expire_time() { - return now_millis() + A_VERY_LONG_TIME; +clk::time_point gossiper::compute_expire_time() { + return now() + A_VERY_LONG_TIME; } void gossiper::dump_endpoint_state_map() { @@ -1314,7 +1317,7 @@ void gossiper::dump_endpoint_state_map() { } void gossiper::debug_show() { - auto reporter = std::make_shared>(); + auto reporter = std::make_shared>(); reporter->set_callback ([reporter] { auto& gossiper = gms::get_local_gossiper(); gossiper.dump_endpoint_state_map(); diff --git a/gms/gossiper.hh b/gms/gossiper.hh index 91db1aab15..a7145adb08 100644 --- a/gms/gossiper.hh +++ b/gms/gossiper.hh @@ -35,6 +35,7 @@ #include #include #include +#include namespace gms { @@ -59,6 +60,8 @@ class i_failure_detector; * the Failure Detector. */ class gossiper : public i_failure_detection_event_listener, public enable_shared_from_this { +public: + using clk = std::chrono::high_resolution_clock; private: using messaging_verb = net::messaging_verb; using messaging_service = net::messaging_service; @@ -80,7 +83,7 @@ private: return shard_id{to, _default_cpuid}; } void do_sort(std::vector& g_digest_list); - timer _scheduled_gossip_task; + timer _scheduled_gossip_task; sstring get_cluster_name() { // FIXME: DatabaseDescriptor.getClusterName() return "my_cluster_name"; @@ -102,26 +105,21 @@ public: _seeds_from_config = _seeds; } public: - static int64_t now_millis() { - return db_clock::now().time_since_epoch().count(); - } - static int64_t now_nanos() { - return now_millis() * 1000; - } + static clk::time_point inline now() { return clk::now(); } public: /* map where key is the endpoint and value is the state associated with the endpoint */ std::unordered_map endpoint_state_map; const std::vector DEAD_STATES = { versioned_value::REMOVING_TOKEN, versioned_value::REMOVED_TOKEN, versioned_value::STATUS_LEFT, versioned_value::HIBERNATE }; - static constexpr int INTERVAL_IN_MILLIS = 1000; - static constexpr int64_t A_VERY_LONG_TIME = 259200 * 1000; // 3 days in milliseconds + static constexpr std::chrono::milliseconds INTERVAL{1000}; + static constexpr std::chrono::hours A_VERY_LONG_TIME{24 * 3}; /** Maximimum difference in generation and version values we are willing to accept about a peer */ static constexpr int64_t MAX_GENERATION_DIFFERENCE = 86400 * 365; - int64_t fat_client_timeout; + std::chrono::milliseconds fat_client_timeout; - static int quarantine_delay(); + static std::chrono::milliseconds quarantine_delay(); private: std::random_device _random; @@ -132,7 +130,7 @@ private: std::set _live_endpoints; /* unreachable member set */ - std::map _unreachable_endpoints; + std::map _unreachable_endpoints; /* initial seeds for joining the cluster */ std::set _seeds; @@ -141,13 +139,13 @@ private: * gossip. We will ignore any gossip regarding these endpoints for QUARANTINE_DELAY time * after removal to prevent nodes from falsely reincarnating during the time when removal * gossip gets propagated to all nodes */ - std::map _just_removed_endpoints; + std::map _just_removed_endpoints; - std::map _expire_time_endpoint_map; + std::map _expire_time_endpoint_map; bool _in_shadow_round = false; - int64_t _last_processed_message_at = now_millis(); + clk::time_point _last_processed_message_at = now(); std::unordered_map _shadow_endpoint_state_map; std::set _shadow_live_endpoints; @@ -155,8 +153,8 @@ private: void run(); public: gossiper(); - void set_last_processed_message_at(int64_t time_in_millis) { - _last_processed_message_at = time_in_millis; + void set_last_processed_message_at(clk::time_point tp) { + _last_processed_message_at = tp; } bool seen_any_seed(); @@ -234,7 +232,7 @@ private: * @param endpoint * @param quarantine_expiration */ - void quarantine_endpoint(inet_address endpoint, int64_t quarantine_expiration); + void quarantine_endpoint(inet_address endpoint, clk::time_point quarantine_expiration); public: /** @@ -320,7 +318,7 @@ private: void do_status_check(); public: - int64_t get_expire_time_for_endpoint(inet_address endpoint); + clk::time_point get_expire_time_for_endpoint(inet_address endpoint); std::experimental::optional get_endpoint_state_for_endpoint(inet_address ep); @@ -468,9 +466,9 @@ public: #endif public: - void add_expire_time_for_endpoint(inet_address endpoint, int64_t expire_time); + void add_expire_time_for_endpoint(inet_address endpoint, clk::time_point expire_time); - static int64_t compute_expire_time(); + static clk::time_point compute_expire_time(); public: void dump_endpoint_state_map(); void debug_show();