gossiper: Add and improve logging

This commit is contained in:
Tomasz Grabiec
2017-10-04 12:37:09 +02:00
parent 0ed84710d9
commit 41ffefd194

View File

@@ -184,8 +184,8 @@ void gossiper::do_sort(std::vector<gossip_digest>& g_digest_list) {
// Depends on
// - no external dependency
future<> gossiper::handle_syn_msg(msg_addr from, gossip_digest_syn syn_msg) {
logger.trace("cluster_name:peer={},local={},partitioner_name:peer={},local={}",
syn_msg.cluster_id(), get_cluster_name(), syn_msg.partioner(), get_partitioner_name());
logger.trace("handle_syn_msg():from={},cluster_name:peer={},local={},partitioner_name:peer={},local={}",
from, syn_msg.cluster_id(), get_cluster_name(), syn_msg.partioner(), get_partitioner_name());
this->set_last_processed_message_at();
if (!this->is_enabled()) {
return make_ready_future<>();
@@ -208,6 +208,7 @@ future<> gossiper::handle_syn_msg(msg_addr from, gossip_digest_syn syn_msg) {
std::map<inet_address, endpoint_state> delta_ep_state_map;
this->examine_gossiper(g_digest_list, delta_gossip_digest_list, delta_ep_state_map);
gms::gossip_digest_ack ack_msg(std::move(delta_gossip_digest_list), std::move(delta_ep_state_map));
logger.trace("handle_syn_msg(): response={}", ack_msg);
return this->ms().send_gossip_digest_ack(from, std::move(ack_msg));
}
@@ -218,6 +219,8 @@ future<> gossiper::handle_syn_msg(msg_addr from, gossip_digest_syn syn_msg) {
// - on_join callbacks
// - on_alive
future<> gossiper::handle_ack_msg(msg_addr id, gossip_digest_ack ack_msg) {
logger.trace("handle_ack_msg():from={},msg={}", id, ack_msg);
this->set_last_processed_message_at();
if (!this->is_enabled() && !this->is_in_shadow_round()) {
return make_ready_future<>();
@@ -263,6 +266,7 @@ future<> gossiper::handle_ack_msg(msg_addr id, gossip_digest_ack ack_msg) {
// - on_join callbacks
// - on_alive callbacks
future<> gossiper::handle_ack2_msg(gossip_digest_ack2 msg) {
logger.trace("handle_ack2_msg():msg={}", msg);
set_last_processed_message_at();
if (!is_enabled()) {
return make_ready_future<>();
@@ -409,6 +413,7 @@ void gossiper::notify_failure_detector(inet_address endpoint, const endpoint_sta
future<> gossiper::apply_state_locally(std::map<inet_address, endpoint_state> map) {
return seastar::with_semaphore(_apply_state_locally_semaphore, 1, [this, g = this->shared_from_this(), map = std::move(map)] {
return seastar::async([this, g, map = std::move(map)] () mutable {
auto start = std::chrono::steady_clock::now();
auto endpoints = boost::copy_range<std::vector<inet_address>>(map | boost::adaptors::map_keys);
std::shuffle(endpoints.begin(), endpoints.end(), _random_engine);
auto node_is_seed = [this] (gms::inet_address ip) { return is_seed(ip); };
@@ -464,6 +469,8 @@ future<> gossiper::apply_state_locally(std::map<inet_address, endpoint_state> ma
handle_major_state_change(ep, remote_state);
}
}
logger.debug("apply_state_locally() took {} ms", std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - start).count());
});
});
}
@@ -1130,8 +1137,7 @@ std::experimental::optional<endpoint_state> gossiper::get_state_for_version_bigg
reqd_endpoint_state.emplace(eps.get_heart_beat_state());
}
auto& key = entry.first;
// FIXME: Add operator<< for application_state
logger.trace("Adding state {}: {}" , key, value.value);
logger.trace("Adding state of {}, {}: {}" , for_endpoint, key, value.value);
reqd_endpoint_state->add_application_state(key, value);
}
}
@@ -1386,6 +1392,7 @@ void gossiper::send_all(gossip_digest& g_digest,
std::map<inet_address, endpoint_state>& delta_ep_state_map,
int max_remote_version) {
auto ep = g_digest.get_endpoint();
logger.trace("send_all(): ep={}, version > {}", ep, max_remote_version);
auto local_ep_state_ptr = get_state_for_version_bigger_than(ep, max_remote_version);
if (local_ep_state_ptr) {
delta_ep_state_map[ep] = *local_ep_state_ptr;
@@ -1421,6 +1428,8 @@ void gossiper::examine_gossiper(std::vector<gossip_digest>& g_digest_list,
int local_generation = ep_state_ptr.get_heart_beat_state().get_generation();
/* get the max version of all keys in the state associated with this endpoint */
int max_local_version = get_max_endpoint_state_version(ep_state_ptr);
logger.trace("examine_gossiper(): ep={}, remote={}.{}, local={}.{}", it->first,
remote_generation, max_remote_version, local_generation, max_local_version);
if (remote_generation == local_generation && max_remote_version == max_local_version) {
continue;
}
@@ -1443,6 +1452,7 @@ void gossiper::examine_gossiper(std::vector<gossip_digest>& g_digest_list,
* greater than the max remote version.
*/
if (max_remote_version > max_local_version) {
logger.trace("examine_gossiper(): requesting version > {} from {}", max_local_version, g_digest.get_endpoint());
delta_gossip_digest_list.emplace_back(g_digest.get_endpoint(), remote_generation, max_local_version);
} else if (max_remote_version < max_local_version) {
/* send all data with generation = localgeneration and version > max_remote_version */