From d7f053a494130aa3ff9fa332fc64bb8e7f87f12e Mon Sep 17 00:00:00 2001 From: Asias He Date: Mon, 13 Apr 2015 14:27:20 +0800 Subject: [PATCH] gossip: Implement GossipDigestSynVerbHandler --- gms/gossiper.cc | 79 +++++++++++++++++++++++++++++++++++++++++-------- gms/gossiper.hh | 1 + 2 files changed, 67 insertions(+), 13 deletions(-) diff --git a/gms/gossiper.cc b/gms/gossiper.cc index d6da08f5a0..dae3e172b1 100644 --- a/gms/gossiper.cc +++ b/gms/gossiper.cc @@ -13,6 +13,45 @@ gossiper::gossiper() init_messaging_service_handler(); } +/* + * First construct a map whose key is the endpoint in the GossipDigest and the value is the + * GossipDigest itself. Then build a list of version differences i.e difference between the + * version in the GossipDigest and the version in the local state for a given InetAddress. + * Sort this list. Now loop through the sorted list and retrieve the GossipDigest corresponding + * to the endpoint from the map that was initially constructed. +*/ +void gossiper::do_sort(std::vector& g_digest_list) { + /* Construct a map of endpoint to GossipDigest. */ + std::map ep_to_digest_map; + for (auto g_digest : g_digest_list) { + ep_to_digest_map.emplace(g_digest.get_endpoint(), g_digest); + } + + /* + * These digests have their maxVersion set to the difference of the version + * of the local EndpointState and the version found in the GossipDigest. + */ + std::vector diff_digests; + for (auto g_digest : g_digest_list) { + auto ep = g_digest.get_endpoint(); + auto ep_state = this->get_endpoint_state_for_endpoint(ep); + int version = ep_state ? this->get_max_endpoint_state_version(*ep_state) : 0; + int diff_version = ::abs(version - g_digest.get_max_version()); + diff_digests.emplace_back(gossip_digest(ep, g_digest.get_generation(), diff_version)); + } + + g_digest_list.clear(); + std::sort(diff_digests.begin(), diff_digests.end()); + int size = diff_digests.size(); + /* + * Report the digests in descending order. This takes care of the endpoints + * that are far behind w.r.t this local endpoint + */ + for (int i = size - 1; i >= 0; --i) { + g_digest_list.emplace_back(ep_to_digest_map[diff_digests[i].get_endpoint()]); + } +} + void gossiper::init_messaging_service_handler() { ms().register_handler(messaging_verb::ECHO, [] (empty_msg msg) { return make_ready_future(); @@ -22,20 +61,34 @@ void gossiper::init_messaging_service_handler() { get_local_failure_detector().force_conviction(from); return messaging_service::no_wait(); }); - ms().register_handler(messaging_verb::GOSSIP_DIGEST_SYN, [] (gossip_digest_syn msg) { + ms().register_handler(messaging_verb::GOSSIP_DIGEST_SYN, [this] (gossip_digest_syn syn_msg) { // TODO: Implement processing of incoming ACK2 message - print("gossiper: Server got syn msg = %s\n", msg); - auto ep = inet_address("2.2.2.2"); - int32_t gen = 800; - int32_t ver = 900; - std::vector digests{ - {ep, gen++, ver++}, - }; - std::map eps{ - {ep, endpoint_state()}, - }; - gms::gossip_digest_ack ack(std::move(digests), std::move(eps)); - return make_ready_future(ack); + print("gossiper: Server got syn msg = %s\n", syn_msg); + inet_address from; + if (!this->is_enabled()) { + return make_ready_future(gossip_digest_ack()); + } + + /* If the message is from a different cluster throw it away. */ + // FIXME: DatabaseDescriptor.getClusterName and DatabaseDescriptor.getPartitionerName() +#if 0 + if (!syn_msg.cluster_id().equals(DatabaseDescriptor.getClusterName())) { + //logger.warn("ClusterName mismatch from {} {}!={}", from, syn_msg.clusterId, DatabaseDescriptor.getClusterName()); + return; + } + + if (syn_msg.partioner() != "" && !syn_msg.partioner.equals(DatabaseDescriptor.getPartitionerName())) { + logger.warn("Partitioner mismatch from {} {}!={}", from, syn_msg.partioner, DatabaseDescriptor.getPartitionerName()); + return; + } +#endif + auto g_digest_list = syn_msg.get_gossip_digests(); + do_sort(g_digest_list); + std::vector delta_gossip_digest_list; + std::map delta_ep_state_map; + this->examine_gossiper(g_digest_list, delta_gossip_digest_list, delta_ep_state_map); + gms::gossip_digest_ack ack_msg(std::move(delta_gossip_digest_list), std::move(delta_ep_state_map)); + return make_ready_future(std::move(ack_msg)); }); ms().register_handler_oneway(messaging_verb::GOSSIP_DIGEST_ACK2, [this] (gossip_digest_ack2 msg) { print("gossiper: Server got ack2 msg = %s\n", msg); diff --git a/gms/gossiper.hh b/gms/gossiper.hh index 7eddd9cdca..8d63071c12 100644 --- a/gms/gossiper.hh +++ b/gms/gossiper.hh @@ -85,6 +85,7 @@ private: shard_id get_shard_id(inet_address to) { return shard_id{to, _default_cpuid}; } + void do_sort(std::vector& g_digest_list); timer _scheduled_gossip_task; private: inet_address get_broadcast_address() {