gossip: Switch to use chrono for time operation

This is a long-awaited cleanup. Gossiper code runs every second, it is
not performance sensitive, so it does not make much sense to stick to
lowres db_clock, use high_resolution_clock instead.
This commit is contained in:
Asias He
2015-07-28 19:56:27 +08:00
committed by Avi Kivity
parent f9d1c6b79d
commit 0ce3d89a85
3 changed files with 73 additions and 70 deletions

View File

@@ -26,8 +26,8 @@
#include "gms/heart_beat_state.hh"
#include "gms/application_state.hh"
#include "gms/versioned_value.hh"
#include "db_clock.hh"
#include <experimental/optional>
#include <chrono>
namespace gms {
@@ -36,11 +36,13 @@ namespace gms {
* instance. Any state for a given endpoint can be retrieved from this instance.
*/
class endpoint_state {
public:
using clk = std::chrono::high_resolution_clock;
private:
heart_beat_state _heart_beat_state;
std::map<application_state, versioned_value> _application_state;
/* fields below do not get serialized */
db_clock::time_point _update_timestamp;
clk::time_point _update_timestamp;
bool _is_alive;
public:
bool operator==(const endpoint_state& other) const {
@@ -52,13 +54,13 @@ public:
endpoint_state()
: _heart_beat_state(0)
, _update_timestamp(db_clock::now())
, _update_timestamp(clk::now())
, _is_alive(true) {
}
endpoint_state(heart_beat_state initial_hb_state)
: _heart_beat_state(initial_hb_state)
, _update_timestamp(db_clock::now())
, _update_timestamp(clk::now())
, _is_alive(true) {
}
@@ -96,12 +98,12 @@ public:
/**
* @return System.nanoTime() when state was updated last time.
*/
db_clock::time_point get_update_timestamp() {
clk::time_point get_update_timestamp() {
return _update_timestamp;
}
void update_timestamp() {
_update_timestamp = db_clock::now();
_update_timestamp = clk::now();
}
bool is_alive() {

View File

@@ -34,17 +34,20 @@
#include "gms/i_failure_detector.hh"
#include "service/storage_service.hh"
#include "log.hh"
#include <chrono>
namespace gms {
using clk = gossiper::clk;
logging::logger logger("gossip");
constexpr int gossiper::INTERVAL_IN_MILLIS;
constexpr int64_t gossiper::A_VERY_LONG_TIME;
constexpr std::chrono::milliseconds gossiper::INTERVAL;
constexpr std::chrono::hours gossiper::A_VERY_LONG_TIME;
constexpr int64_t gossiper::MAX_GENERATION_DIFFERENCE;
int gossiper::quarantine_delay() {
return service::storage_service::RING_DELAY * 2;
std::chrono::milliseconds gossiper::quarantine_delay() {
return std::chrono::milliseconds(service::storage_service::RING_DELAY * 2);
}
// FIXME: StorageService.instance.valueFactory
@@ -61,7 +64,7 @@ gossiper::gossiper() {
_scheduled_gossip_task.set_callback([this] { run(); });
// half of QUARATINE_DELAY, to ensure _just_removed_endpoints has enough leeway to prevent re-gossip
fat_client_timeout = (int64_t) (quarantine_delay() / 2);
fat_client_timeout = quarantine_delay() / 2;
/* register with the Failure Detector for receiving Failure detector events */
get_local_failure_detector().register_failure_detection_event_listener(this);
// Register this instance with JMX
@@ -107,7 +110,7 @@ void gossiper::do_sort(std::vector<gossip_digest>& g_digest_list) {
}
future<gossip_digest_ack> gossiper::handle_syn_msg(gossip_digest_syn syn_msg) {
this->set_last_processed_message_at(now_millis());
this->set_last_processed_message_at(now());
inet_address from;
if (!this->is_enabled()) {
return make_ready_future<gossip_digest_ack>(gossip_digest_ack());
@@ -136,14 +139,14 @@ void gossiper::init_messaging_service_handler() {
// TODO: Use time_point instead of long for timing.
return smp::submit_to(0, [] {
auto& gossiper = gms::get_local_gossiper();
gossiper.set_last_processed_message_at(now_millis());
gossiper.set_last_processed_message_at(now());
return make_ready_future<>();
});
});
ms().register_gossip_shutdown([] (inet_address from) {
smp::submit_to(0, [from] {
auto& gossiper = gms::get_local_gossiper();
gossiper.set_last_processed_message_at(now_millis());
gossiper.set_last_processed_message_at(now());
// TODO: Implement processing of incoming SHUTDOWN message
get_local_failure_detector().force_conviction(from);
}).discard_result();
@@ -158,7 +161,7 @@ void gossiper::init_messaging_service_handler() {
ms().register_gossip_digest_ack2([] (gossip_digest_ack2 msg) {
smp::submit_to(0, [msg = std::move(msg)] () mutable {
auto& gossiper = gms::get_local_gossiper();
gossiper.set_last_processed_message_at(now_millis());
gossiper.set_last_processed_message_at(now());
auto& remote_ep_state_map = msg.get_endpoint_state_map();
/* Notify the Failure Detector */
gossiper.notify_failure_detector(remote_ep_state_map);
@@ -184,7 +187,7 @@ bool gossiper::send_gossip(gossip_digest_syn message, std::set<inet_address> eps
try {
auto ack_msg = f.get0();
logger.trace("Got GossipDigestSyn Reply");
this->set_last_processed_message_at(now_millis());
this->set_last_processed_message_at(now());
if (!this->is_enabled() && !this->is_in_shadow_round()) {
return;
}
@@ -351,7 +354,7 @@ void gossiper::remove_endpoint(inet_address endpoint) {
void gossiper::do_status_check() {
logger.trace("Performing status check ...");
int64_t now = now_millis();
auto now = this->now();
for (auto& entry : endpoint_state_map) {
const inet_address& endpoint = entry.first;
@@ -368,17 +371,17 @@ void gossiper::do_status_check() {
// gossip after FatClientTimeout. Do not remove dead states here.
if (is_gossip_only_member(endpoint)
&& !_just_removed_endpoints.count(endpoint)
&& ((now - ep_state.get_update_timestamp().time_since_epoch().count()) > fat_client_timeout)) {
logger.info("FatClient {} has been silent for {}ms, removing from gossip", endpoint, fat_client_timeout);
&& ((now - ep_state.get_update_timestamp()) > fat_client_timeout)) {
logger.info("FatClient {} has been silent for {}ms, removing from gossip", endpoint, fat_client_timeout.count());
remove_endpoint(endpoint); // will put it in _just_removed_endpoints to respect quarantine delay
evict_from_membershipg(endpoint); // can get rid of the state immediately
}
// check for dead state removal
int64_t expire_time = get_expire_time_for_endpoint(endpoint);
auto expire_time = get_expire_time_for_endpoint(endpoint);
if (!ep_state.is_alive() && (now > expire_time)
&& (!service::get_local_storage_service().get_token_metadata().is_member(endpoint))) {
logger.debug("time is expiring for endpoint : {} ({})", endpoint, expire_time);
logger.debug("time is expiring for endpoint : {} ({})", endpoint, expire_time.time_since_epoch().count());
evict_from_membershipg(endpoint);
}
}
@@ -387,7 +390,7 @@ void gossiper::do_status_check() {
for (auto it = _just_removed_endpoints.begin(); it != _just_removed_endpoints.end();) {
auto& t= it->second;
if ((now - t) > quarantine_delay()) {
logger.debug("{} elapsed, {} gossip quarantine over", quarantine_delay(), it->first);
logger.debug("{} ms elapsed, {} gossip quarantine over", quarantine_delay().count(), it->first);
it = _just_removed_endpoints.erase(it);
} else {
it++;
@@ -453,8 +456,6 @@ void gossiper::run() {
// them across all other shards.
// - Reschedule the gossiper only after execution on all nodes is done.
//
std::chrono::milliseconds period(INTERVAL_IN_MILLIS);
bool endpoint_map_changed =
(_shadow_endpoint_state_map != endpoint_state_map);
bool live_endpoint_changed =
@@ -483,9 +484,9 @@ void gossiper::run() {
local_gossiper._live_endpoints = _shadow_live_endpoints;
}
}
}).then([this, period] { _scheduled_gossip_task.arm(period); });
}).then([this] { _scheduled_gossip_task.arm(INTERVAL); });
} else {
_scheduled_gossip_task.arm(period);
_scheduled_gossip_task.arm(INTERVAL);
}
}
@@ -543,11 +544,12 @@ std::set<inet_address> gossiper::get_unreachable_token_owners() {
return token_owners;
}
// Return downtime in microseconds
int64_t gossiper::get_endpoint_downtime(inet_address ep) {
auto it = _unreachable_endpoints.find(ep);
if (it != _unreachable_endpoints.end()) {
auto& downtime = it->second;
return (now_nanos() - downtime) / 1000;
return std::chrono::duration_cast<std::chrono::microseconds>(now() - downtime).count();
} else {
return 0L;
}
@@ -593,17 +595,17 @@ void gossiper::evict_from_membershipg(inet_address endpoint) {
}
void gossiper::quarantine_endpoint(inet_address endpoint) {
quarantine_endpoint(endpoint, now_millis());
quarantine_endpoint(endpoint, now());
}
void gossiper::quarantine_endpoint(inet_address endpoint, int64_t quarantine_expiration) {
void gossiper::quarantine_endpoint(inet_address endpoint, clk::time_point quarantine_expiration) {
_just_removed_endpoints[endpoint] = quarantine_expiration;
}
void gossiper::replacement_quarantine(inet_address endpoint) {
// remember, quarantine_endpoint will effectively already add QUARANTINE_DELAY, so this is 2x
// logger.debug("");
quarantine_endpoint(endpoint, now_millis() + quarantine_delay());
quarantine_endpoint(endpoint, now() + quarantine_delay());
}
void gossiper::replaced_endpoint(inet_address endpoint) {
@@ -672,14 +674,14 @@ void gossiper::advertise_token_removed(inet_address endpoint, utils::UUID host_i
auto& eps = endpoint_state_map.at(endpoint);
eps.update_timestamp(); // make sure we don't evict it too soon
eps.get_heart_beat_state().force_newer_generation_unsafe();
int64_t expire_time = compute_expire_time();
eps.add_application_state(application_state::STATUS, storage_service_value_factory().removed_nonlocal(host_id, expire_time));
auto expire_time = compute_expire_time();
eps.add_application_state(application_state::STATUS, storage_service_value_factory().removed_nonlocal(host_id, expire_time.time_since_epoch().count()));
logger.info("Completing removal of {}", endpoint);
add_expire_time_for_endpoint(endpoint, expire_time);
endpoint_state_map[endpoint] = eps;
// ensure at least one gossip round occurs before returning
// FIXME: sleep
//Uninterruptibles.sleepUninterruptibly(INTERVAL_IN_MILLIS * 2, TimeUnit.MILLISECONDS);
//Uninterruptibles.sleepUninterruptibly(INTERVAL * 2, TimeUnit.MILLISECONDS);
warn(unimplemented::cause::GOSSIP);
}
@@ -691,8 +693,10 @@ void gossiper::unsafe_assassinate_endpoint(sstring address) {
void gossiper::assassinate_endpoint(sstring address) {
inet_address endpoint(address);
auto is_exist = endpoint_state_map.count(endpoint);
int gen = std::chrono::duration_cast<std::chrono::seconds>((now() + std::chrono::seconds(60)).time_since_epoch()).count();
int ver = 9999;
endpoint_state&& ep_state = is_exist ? endpoint_state_map.at(endpoint) :
endpoint_state(heart_beat_state((int) ((now_millis() + 60000) / 1000), 9999));
endpoint_state(heart_beat_state(gen, ver));
//Collection<Token> tokens = null;
logger.warn("Assassinating {} via gossip", endpoint);
if (is_exist) {
@@ -733,7 +737,7 @@ void gossiper::assassinate_endpoint(sstring address) {
// FIXME: StorageService.instance and Sleep
// ep_state.add_application_state(application_state::STATUS, StorageService.instance.valueFactory.left(tokens, compute_expire_time()));
handle_major_state_change(endpoint, ep_state);
// Uninterruptibles.sleepUninterruptibly(INTERVAL_IN_MILLIS * 4, TimeUnit.MILLISECONDS);
// Uninterruptibles.sleepUninterruptibly(INTERVAL * 4, TimeUnit.MILLISECONDS);
logger.warn("Finished assassinating {}", endpoint);
}
@@ -807,13 +811,13 @@ bool gossiper::is_gossip_only_member(inet_address endpoint) {
return !is_dead_state(eps) && !ss.get_token_metadata().is_member(endpoint);
}
int64_t gossiper::get_expire_time_for_endpoint(inet_address endpoint) {
clk::time_point gossiper::get_expire_time_for_endpoint(inet_address endpoint) {
/* default expire_time is A_VERY_LONG_TIME */
auto it = _expire_time_endpoint_map.find(endpoint);
if (it == _expire_time_endpoint_map.end()) {
return compute_expire_time();
} else {
int64_t stored_time = it->second;
auto stored_time = it->second;
return stored_time;
}
}
@@ -921,7 +925,7 @@ void gossiper::mark_alive(inet_address addr, endpoint_state local_state) {
try {
f.get();
logger.trace("Got EchoMessage Reply");
this->set_last_processed_message_at(now_millis());
this->set_last_processed_message_at(now());
this->real_mark_alive(id.addr, local_state);
} catch (...) {
logger.error("Fail to send EchoMessage to {}: {}", id, std::current_exception());
@@ -948,7 +952,7 @@ void gossiper::mark_dead(inet_address addr, endpoint_state& local_state) {
logger.trace("marking as down {}", addr);
local_state.mark_dead();
_live_endpoints.erase(addr);
_unreachable_endpoints[addr] = now_nanos();
_unreachable_endpoints[addr] = now();
logger.info("inet_address {} is now DOWN", addr);
for (auto& subscriber : _subscribers) {
subscriber->on_dead(addr, local_state);
@@ -1141,8 +1145,7 @@ future<> gossiper::start(int generation_nbr, std::map<application_state, version
DatabaseDescriptor.getEndpointSnitch().gossiperStarting();
#endif
logger.trace("gossip started with generation {}", local_state.get_heart_beat_state().get_generation());
std::chrono::milliseconds period(INTERVAL_IN_MILLIS);
_scheduled_gossip_task.arm(period);
_scheduled_gossip_task.arm(INTERVAL);
return make_ready_future<>();
});
}
@@ -1160,7 +1163,7 @@ void gossiper::do_shadow_round() {
try {
auto ack_msg = f.get0();
logger.trace("Got GossipDigestSyn Reply");
this->set_last_processed_message_at(now_millis());
this->set_last_processed_message_at(now());
if (this->is_in_shadow_round()) {
this->finish_shadow_round();
}
@@ -1223,7 +1226,7 @@ void gossiper::add_saved_endpoint(inet_address ep) {
}
ep_state.mark_dead();
endpoint_state_map[ep] = ep_state;
_unreachable_endpoints[ep] = now_nanos();
_unreachable_endpoints[ep] = now();
logger.trace("Adding saved endpoint {} {}", ep, ep_state.get_heart_beat_state().get_generation());
}
@@ -1256,7 +1259,7 @@ void gossiper::shutdown() {
// if (scheduledGossipTask != null)
// scheduledGossipTask.cancel(false);
logger.info("Announcing shutdown");
// Uninterruptibles.sleepUninterruptibly(INTERVAL_IN_MILLIS * 2, TimeUnit.MILLISECONDS);
// Uninterruptibles.sleepUninterruptibly(INTERVAL * 2, TimeUnit.MILLISECONDS);
for (inet_address addr : _live_endpoints) {
shard_id id = get_shard_id(addr);
logger.trace("Sending a GossipShutdown to {}", id);
@@ -1297,13 +1300,13 @@ bool gossiper::is_in_shadow_round() {
return _in_shadow_round;
}
void gossiper::add_expire_time_for_endpoint(inet_address endpoint, int64_t expire_time) {
logger.debug("adding expire time for endpoint : {} ({})", endpoint, expire_time);
void gossiper::add_expire_time_for_endpoint(inet_address endpoint, clk::time_point expire_time) {
logger.debug("adding expire time for endpoint : {} ({})", endpoint, expire_time.time_since_epoch().count());
_expire_time_endpoint_map[endpoint] = expire_time;
}
int64_t gossiper::compute_expire_time() {
return now_millis() + A_VERY_LONG_TIME;
clk::time_point gossiper::compute_expire_time() {
return now() + A_VERY_LONG_TIME;
}
void gossiper::dump_endpoint_state_map() {
@@ -1314,7 +1317,7 @@ void gossiper::dump_endpoint_state_map() {
}
void gossiper::debug_show() {
auto reporter = std::make_shared<timer<lowres_clock>>();
auto reporter = std::make_shared<timer<clk>>();
reporter->set_callback ([reporter] {
auto& gossiper = gms::get_local_gossiper();
gossiper.dump_endpoint_state_map();

View File

@@ -35,6 +35,7 @@
#include <boost/algorithm/string.hpp>
#include <experimental/optional>
#include <algorithm>
#include <chrono>
namespace gms {
@@ -59,6 +60,8 @@ class i_failure_detector;
* the Failure Detector.
*/
class gossiper : public i_failure_detection_event_listener, public enable_shared_from_this<gossiper> {
public:
using clk = std::chrono::high_resolution_clock;
private:
using messaging_verb = net::messaging_verb;
using messaging_service = net::messaging_service;
@@ -80,7 +83,7 @@ private:
return shard_id{to, _default_cpuid};
}
void do_sort(std::vector<gossip_digest>& g_digest_list);
timer<lowres_clock> _scheduled_gossip_task;
timer<clk> _scheduled_gossip_task;
sstring get_cluster_name() {
// FIXME: DatabaseDescriptor.getClusterName()
return "my_cluster_name";
@@ -102,26 +105,21 @@ public:
_seeds_from_config = _seeds;
}
public:
static int64_t now_millis() {
return db_clock::now().time_since_epoch().count();
}
static int64_t now_nanos() {
return now_millis() * 1000;
}
static clk::time_point inline now() { return clk::now(); }
public:
/* map where key is the endpoint and value is the state associated with the endpoint */
std::unordered_map<inet_address, endpoint_state> endpoint_state_map;
const std::vector<sstring> DEAD_STATES = { versioned_value::REMOVING_TOKEN, versioned_value::REMOVED_TOKEN,
versioned_value::STATUS_LEFT, versioned_value::HIBERNATE };
static constexpr int INTERVAL_IN_MILLIS = 1000;
static constexpr int64_t A_VERY_LONG_TIME = 259200 * 1000; // 3 days in milliseconds
static constexpr std::chrono::milliseconds INTERVAL{1000};
static constexpr std::chrono::hours A_VERY_LONG_TIME{24 * 3};
/** Maximimum difference in generation and version values we are willing to accept about a peer */
static constexpr int64_t MAX_GENERATION_DIFFERENCE = 86400 * 365;
int64_t fat_client_timeout;
std::chrono::milliseconds fat_client_timeout;
static int quarantine_delay();
static std::chrono::milliseconds quarantine_delay();
private:
std::random_device _random;
@@ -132,7 +130,7 @@ private:
std::set<inet_address> _live_endpoints;
/* unreachable member set */
std::map<inet_address, int64_t> _unreachable_endpoints;
std::map<inet_address, clk::time_point> _unreachable_endpoints;
/* initial seeds for joining the cluster */
std::set<inet_address> _seeds;
@@ -141,13 +139,13 @@ private:
* gossip. We will ignore any gossip regarding these endpoints for QUARANTINE_DELAY time
* after removal to prevent nodes from falsely reincarnating during the time when removal
* gossip gets propagated to all nodes */
std::map<inet_address, int64_t> _just_removed_endpoints;
std::map<inet_address, clk::time_point> _just_removed_endpoints;
std::map<inet_address, int64_t> _expire_time_endpoint_map;
std::map<inet_address, clk::time_point> _expire_time_endpoint_map;
bool _in_shadow_round = false;
int64_t _last_processed_message_at = now_millis();
clk::time_point _last_processed_message_at = now();
std::unordered_map<inet_address, endpoint_state> _shadow_endpoint_state_map;
std::set<inet_address> _shadow_live_endpoints;
@@ -155,8 +153,8 @@ private:
void run();
public:
gossiper();
void set_last_processed_message_at(int64_t time_in_millis) {
_last_processed_message_at = time_in_millis;
void set_last_processed_message_at(clk::time_point tp) {
_last_processed_message_at = tp;
}
bool seen_any_seed();
@@ -234,7 +232,7 @@ private:
* @param endpoint
* @param quarantine_expiration
*/
void quarantine_endpoint(inet_address endpoint, int64_t quarantine_expiration);
void quarantine_endpoint(inet_address endpoint, clk::time_point quarantine_expiration);
public:
/**
@@ -320,7 +318,7 @@ private:
void do_status_check();
public:
int64_t get_expire_time_for_endpoint(inet_address endpoint);
clk::time_point get_expire_time_for_endpoint(inet_address endpoint);
std::experimental::optional<endpoint_state> get_endpoint_state_for_endpoint(inet_address ep);
@@ -468,9 +466,9 @@ public:
#endif
public:
void add_expire_time_for_endpoint(inet_address endpoint, int64_t expire_time);
void add_expire_time_for_endpoint(inet_address endpoint, clk::time_point expire_time);
static int64_t compute_expire_time();
static clk::time_point compute_expire_time();
public:
void dump_endpoint_state_map();
void debug_show();