gossip: Implement GossipDigestSynVerbHandler

This commit is contained in:
Asias He
2015-04-13 14:27:20 +08:00
parent 3809934a05
commit d7f053a494
2 changed files with 67 additions and 13 deletions

View File

@@ -13,6 +13,45 @@ gossiper::gossiper()
init_messaging_service_handler();
}
/*
* First construct a map whose key is the endpoint in the GossipDigest and the value is the
* GossipDigest itself. Then build a list of version differences i.e difference between the
* version in the GossipDigest and the version in the local state for a given InetAddress.
* Sort this list. Now loop through the sorted list and retrieve the GossipDigest corresponding
* to the endpoint from the map that was initially constructed.
*/
void gossiper::do_sort(std::vector<gossip_digest>& g_digest_list) {
/* Construct a map of endpoint to GossipDigest. */
std::map<inet_address, gossip_digest> ep_to_digest_map;
for (auto g_digest : g_digest_list) {
ep_to_digest_map.emplace(g_digest.get_endpoint(), g_digest);
}
/*
* These digests have their maxVersion set to the difference of the version
* of the local EndpointState and the version found in the GossipDigest.
*/
std::vector<gossip_digest> diff_digests;
for (auto g_digest : g_digest_list) {
auto ep = g_digest.get_endpoint();
auto ep_state = this->get_endpoint_state_for_endpoint(ep);
int version = ep_state ? this->get_max_endpoint_state_version(*ep_state) : 0;
int diff_version = ::abs(version - g_digest.get_max_version());
diff_digests.emplace_back(gossip_digest(ep, g_digest.get_generation(), diff_version));
}
g_digest_list.clear();
std::sort(diff_digests.begin(), diff_digests.end());
int size = diff_digests.size();
/*
* Report the digests in descending order. This takes care of the endpoints
* that are far behind w.r.t this local endpoint
*/
for (int i = size - 1; i >= 0; --i) {
g_digest_list.emplace_back(ep_to_digest_map[diff_digests[i].get_endpoint()]);
}
}
void gossiper::init_messaging_service_handler() {
ms().register_handler(messaging_verb::ECHO, [] (empty_msg msg) {
return make_ready_future<empty_msg>();
@@ -22,20 +61,34 @@ void gossiper::init_messaging_service_handler() {
get_local_failure_detector().force_conviction(from);
return messaging_service::no_wait();
});
ms().register_handler(messaging_verb::GOSSIP_DIGEST_SYN, [] (gossip_digest_syn msg) {
ms().register_handler(messaging_verb::GOSSIP_DIGEST_SYN, [this] (gossip_digest_syn syn_msg) {
// TODO: Implement processing of incoming ACK2 message
print("gossiper: Server got syn msg = %s\n", msg);
auto ep = inet_address("2.2.2.2");
int32_t gen = 800;
int32_t ver = 900;
std::vector<gms::gossip_digest> digests{
{ep, gen++, ver++},
};
std::map<inet_address, endpoint_state> eps{
{ep, endpoint_state()},
};
gms::gossip_digest_ack ack(std::move(digests), std::move(eps));
return make_ready_future<gossip_digest_ack>(ack);
print("gossiper: Server got syn msg = %s\n", syn_msg);
inet_address from;
if (!this->is_enabled()) {
return make_ready_future<gossip_digest_ack>(gossip_digest_ack());
}
/* If the message is from a different cluster throw it away. */
// FIXME: DatabaseDescriptor.getClusterName and DatabaseDescriptor.getPartitionerName()
#if 0
if (!syn_msg.cluster_id().equals(DatabaseDescriptor.getClusterName())) {
//logger.warn("ClusterName mismatch from {} {}!={}", from, syn_msg.clusterId, DatabaseDescriptor.getClusterName());
return;
}
if (syn_msg.partioner() != "" && !syn_msg.partioner.equals(DatabaseDescriptor.getPartitionerName())) {
logger.warn("Partitioner mismatch from {} {}!={}", from, syn_msg.partioner, DatabaseDescriptor.getPartitionerName());
return;
}
#endif
auto g_digest_list = syn_msg.get_gossip_digests();
do_sort(g_digest_list);
std::vector<gossip_digest> delta_gossip_digest_list;
std::map<inet_address, endpoint_state> delta_ep_state_map;
this->examine_gossiper(g_digest_list, delta_gossip_digest_list, delta_ep_state_map);
gms::gossip_digest_ack ack_msg(std::move(delta_gossip_digest_list), std::move(delta_ep_state_map));
return make_ready_future<gossip_digest_ack>(std::move(ack_msg));
});
ms().register_handler_oneway(messaging_verb::GOSSIP_DIGEST_ACK2, [this] (gossip_digest_ack2 msg) {
print("gossiper: Server got ack2 msg = %s\n", msg);

View File

@@ -85,6 +85,7 @@ private:
shard_id get_shard_id(inet_address to) {
return shard_id{to, _default_cpuid};
}
void do_sort(std::vector<gossip_digest>& g_digest_list);
timer<lowres_clock> _scheduled_gossip_task;
private:
inet_address get_broadcast_address() {