gossip: Limit number of pending gossip ACK2 messages

Similar to "gossip: Limit number of pending gossip ACK messages", limit
the number of pending gossip ACK2 messages in gossiper::handle_ack_msg.

Fixes #5210
This commit is contained in:
Asias He
2019-10-19 06:54:25 +08:00
parent 15148182ab
commit f32ae00510
2 changed files with 63 additions and 6 deletions

View File

@@ -322,15 +322,65 @@ future<> gossiper::handle_ack_msg(msg_addr id, gossip_digest_ack ack_msg) {
assert(_msg_processing > 0);
return f.then([id, g_digest_list = std::move(g_digest_list), mp = std::move(mp), this] {
return f.then([this, from = id, ack_msg_digest = std::move(g_digest_list), mp = std::move(mp), g = this->shared_from_this()] () mutable {
if (this->is_in_shadow_round()) {
this->finish_shadow_round();
// don't bother doing anything else, we have what we came for
return make_ready_future<>();
}
ack_msg_pending& p = _ack_handlers[from.addr];
if (p.pending) {
// The latest ack message digests from peer has the latest infomation, so
// it is safe to drop the previous ack message digests and keep the latest
// one only.
logger.debug("Queue gossip ack msg digests from node {}, ack_msg_digest={}", from, ack_msg_digest);
p.ack_msg_digest = std::move(ack_msg_digest);
return make_ready_future<>();
} else {
// Process the ack message immediately
logger.debug("Process gossip ack msg digests from node {}, ack_msg_digest={}", from, ack_msg_digest);
p.pending = true;
return do_with(std::move(ack_msg_digest), [this, from, g] (utils::chunked_vector<gossip_digest>& ack_msg_digest) mutable {
return repeat([this, from, g, &ack_msg_digest] {
return do_send_ack2_msg(from, std::move(ack_msg_digest)).then([this, from, &ack_msg_digest] () mutable {
if (!_ack_handlers.count(from.addr)) {
return stop_iteration::yes;
}
ack_msg_pending& p = _ack_handlers[from.addr];
if (p.ack_msg_digest) {
// Process pending gossip ack msg digests and send ack2 msg back
logger.debug("Handle queued gossip ack msg digests from node {}, ack_msg_digest={}, pending={}",
from, p.ack_msg_digest, p.pending);
ack_msg_digest = std::move(p.ack_msg_digest.value());
p.ack_msg_digest= {};
return stop_iteration::no;
} else {
// No more pending ack msg digests to process
p.pending = false;
logger.debug("No more queued gossip ack msg digests from node {}, ack_msg_digest={}, pending={}",
from, p.ack_msg_digest, p.pending);
return stop_iteration::yes;
}
}).handle_exception([this, from] (std::exception_ptr ep) {
if (_ack_handlers.count(from.addr)) {
ack_msg_pending& p = _ack_handlers[from.addr];
p.pending = false;
p.ack_msg_digest = {};
logger.warn("Failed to process gossip ack msg digests from node {}: {}", from, ep);
}
return make_exception_future<stop_iteration>(ep);
});
});
});
}
});
}
future<> gossiper::do_send_ack2_msg(msg_addr from, utils::chunked_vector<gossip_digest> ack_msg_digest) {
return futurize_apply([this, from, ack_msg_digest = std::move(ack_msg_digest)] () mutable {
/* Get the state required to send to this gossipee - construct GossipDigestAck2Message */
std::map<inet_address, endpoint_state> delta_ep_state_map;
for (auto g_digest : g_digest_list) {
for (auto g_digest : ack_msg_digest) {
inet_address addr = g_digest.get_endpoint();
auto local_ep_state_ptr = this->get_state_for_version_bigger_than(addr, g_digest.get_max_version());
if (local_ep_state_ptr) {
@@ -338,10 +388,8 @@ future<> gossiper::handle_ack_msg(msg_addr id, gossip_digest_ack ack_msg) {
}
}
gms::gossip_digest_ack2 ack2_msg(std::move(delta_ep_state_map));
logger.trace("Sending a GossipDigestACK2 to {}", id);
return this->ms().send_gossip_digest_ack2(id, std::move(ack2_msg)).handle_exception([id] (auto ep) {
logger.warn("Fail to send GossipDigestACK2 to {}: {}", id, ep);
});
logger.debug("Calling do_send_ack2_msg to node {}, ack_msg_digest={}, ack2_msg={}", from, ack_msg_digest, ack2_msg);
return this->ms().send_gossip_digest_ack2(from, std::move(ack2_msg));
});
}
@@ -597,6 +645,7 @@ void gossiper::remove_endpoint(inet_address endpoint) {
_live_endpoints_just_added.remove(endpoint);
_unreachable_endpoints.erase(endpoint);
_syn_handlers.erase(endpoint);
_ack_handlers.erase(endpoint);
quarantine_endpoint(endpoint);
logger.debug("removing endpoint {}", endpoint);
}

View File

@@ -51,6 +51,7 @@
#include "gms/endpoint_state.hh"
#include "gms/feature.hh"
#include "gms/gossip_digest_syn.hh"
#include "gms/gossip_digest.hh"
#include "utils/loading_shared_values.hh"
#include "utils/in.hh"
#include "message/messaging_service_fwd.hh"
@@ -85,6 +86,11 @@ struct syn_msg_pending {
std::optional<gossip_digest_syn> syn_msg;
};
struct ack_msg_pending {
bool pending = false;
std::optional<utils::chunked_vector<gossip_digest>> ack_msg_digest;
};
/**
* This module is responsible for Gossiping information for the local endpoint. This abstraction
* maintains the list of live and dead endpoints. Periodically i.e. every 1 second this module
@@ -116,6 +122,7 @@ private:
future<> handle_echo_msg();
future<> handle_shutdown_msg(inet_address from);
future<> do_send_ack_msg(msg_addr from, gossip_digest_syn syn_msg);
future<> do_send_ack2_msg(msg_addr from, utils::chunked_vector<gossip_digest> ack_msg_digest);
static constexpr uint32_t _default_cpuid = 0;
msg_addr get_msg_addr(inet_address to);
void do_sort(utils::chunked_vector<gossip_digest>& g_digest_list);
@@ -126,6 +133,7 @@ private:
semaphore _callback_running{1};
semaphore _apply_state_locally_semaphore{100};
std::unordered_map<gms::inet_address, syn_msg_pending> _syn_handlers;
std::unordered_map<gms::inet_address, ack_msg_pending> _ack_handlers;
public:
sstring get_cluster_name();
sstring get_partitioner_name();