gossip: Add set_last_processed_message_at() helper

Set time_point to now by default.
This commit is contained in:
Asias He
2015-07-29 09:54:29 +08:00
parent 65232edfe8
commit 33d3fcf7db
2 changed files with 18 additions and 10 deletions

View File

@@ -70,6 +70,14 @@ gossiper::gossiper() {
// Register this instance with JMX
}
void gossiper::set_last_processed_message_at() {
set_last_processed_message_at(now());
}
void gossiper::set_last_processed_message_at(clk::time_point tp) {
_last_processed_message_at = tp;
}
/*
* First construct a map whose key is the endpoint in the GossipDigest and the value is the
* GossipDigest itself. Then build a list of version differences i.e difference between the
@@ -110,7 +118,7 @@ void gossiper::do_sort(std::vector<gossip_digest>& g_digest_list) {
}
future<gossip_digest_ack> gossiper::handle_syn_msg(gossip_digest_syn syn_msg) {
this->set_last_processed_message_at(now());
this->set_last_processed_message_at();
inet_address from;
if (!this->is_enabled()) {
return make_ready_future<gossip_digest_ack>(gossip_digest_ack());
@@ -139,14 +147,14 @@ void gossiper::init_messaging_service_handler() {
// TODO: Use time_point instead of long for timing.
return smp::submit_to(0, [] {
auto& gossiper = gms::get_local_gossiper();
gossiper.set_last_processed_message_at(now());
gossiper.set_last_processed_message_at();
return make_ready_future<>();
});
});
ms().register_gossip_shutdown([] (inet_address from) {
smp::submit_to(0, [from] {
auto& gossiper = gms::get_local_gossiper();
gossiper.set_last_processed_message_at(now());
gossiper.set_last_processed_message_at();
// TODO: Implement processing of incoming SHUTDOWN message
get_local_failure_detector().force_conviction(from);
}).discard_result();
@@ -161,7 +169,7 @@ void gossiper::init_messaging_service_handler() {
ms().register_gossip_digest_ack2([] (gossip_digest_ack2 msg) {
smp::submit_to(0, [msg = std::move(msg)] () mutable {
auto& gossiper = gms::get_local_gossiper();
gossiper.set_last_processed_message_at(now());
gossiper.set_last_processed_message_at();
auto& remote_ep_state_map = msg.get_endpoint_state_map();
/* Notify the Failure Detector */
gossiper.notify_failure_detector(remote_ep_state_map);
@@ -187,7 +195,7 @@ bool gossiper::send_gossip(gossip_digest_syn message, std::set<inet_address> eps
try {
auto ack_msg = f.get0();
logger.trace("Got GossipDigestSyn Reply");
this->set_last_processed_message_at(now());
this->set_last_processed_message_at();
if (!this->is_enabled() && !this->is_in_shadow_round()) {
return;
}
@@ -925,7 +933,7 @@ void gossiper::mark_alive(inet_address addr, endpoint_state local_state) {
try {
f.get();
logger.trace("Got EchoMessage Reply");
this->set_last_processed_message_at(now());
this->set_last_processed_message_at();
this->real_mark_alive(id.addr, local_state);
} catch (...) {
logger.error("Fail to send EchoMessage to {}: {}", id, std::current_exception());
@@ -1163,7 +1171,7 @@ void gossiper::do_shadow_round() {
try {
auto ack_msg = f.get0();
logger.trace("Got GossipDigestSyn Reply");
this->set_last_processed_message_at(now());
this->set_last_processed_message_at();
if (this->is_in_shadow_round()) {
this->finish_shadow_round();
}

View File

@@ -153,9 +153,9 @@ private:
void run();
public:
gossiper();
void set_last_processed_message_at(clk::time_point tp) {
_last_processed_message_at = tp;
}
void set_last_processed_message_at();
void set_last_processed_message_at(clk::time_point tp);
bool seen_any_seed();