From f32ae00510dcf11ce402c4551ede076b7baec15f Mon Sep 17 00:00:00 2001 From: Asias He Date: Sat, 19 Oct 2019 06:54:25 +0800 Subject: [PATCH] gossip: Limit number of pending gossip ACK2 messages Similar to "gossip: Limit number of pending gossip ACK messages", limit the number of pending gossip ACK2 messages in gossiper::handle_ack_msg. Fixes #5210 --- gms/gossiper.cc | 61 ++++++++++++++++++++++++++++++++++++++++++++----- gms/gossiper.hh | 8 +++++++ 2 files changed, 63 insertions(+), 6 deletions(-) diff --git a/gms/gossiper.cc b/gms/gossiper.cc index db78c0f9fd..2f2f24bce9 100644 --- a/gms/gossiper.cc +++ b/gms/gossiper.cc @@ -322,15 +322,65 @@ future<> gossiper::handle_ack_msg(msg_addr id, gossip_digest_ack ack_msg) { assert(_msg_processing > 0); - return f.then([id, g_digest_list = std::move(g_digest_list), mp = std::move(mp), this] { + return f.then([this, from = id, ack_msg_digest = std::move(g_digest_list), mp = std::move(mp), g = this->shared_from_this()] () mutable { if (this->is_in_shadow_round()) { this->finish_shadow_round(); // don't bother doing anything else, we have what we came for return make_ready_future<>(); } + ack_msg_pending& p = _ack_handlers[from.addr]; + if (p.pending) { + // The latest ack message digests from peer has the latest infomation, so + // it is safe to drop the previous ack message digests and keep the latest + // one only. + logger.debug("Queue gossip ack msg digests from node {}, ack_msg_digest={}", from, ack_msg_digest); + p.ack_msg_digest = std::move(ack_msg_digest); + return make_ready_future<>(); + } else { + // Process the ack message immediately + logger.debug("Process gossip ack msg digests from node {}, ack_msg_digest={}", from, ack_msg_digest); + p.pending = true; + return do_with(std::move(ack_msg_digest), [this, from, g] (utils::chunked_vector& ack_msg_digest) mutable { + return repeat([this, from, g, &ack_msg_digest] { + return do_send_ack2_msg(from, std::move(ack_msg_digest)).then([this, from, &ack_msg_digest] () mutable { + if (!_ack_handlers.count(from.addr)) { + return stop_iteration::yes; + } + ack_msg_pending& p = _ack_handlers[from.addr]; + if (p.ack_msg_digest) { + // Process pending gossip ack msg digests and send ack2 msg back + logger.debug("Handle queued gossip ack msg digests from node {}, ack_msg_digest={}, pending={}", + from, p.ack_msg_digest, p.pending); + ack_msg_digest = std::move(p.ack_msg_digest.value()); + p.ack_msg_digest= {}; + return stop_iteration::no; + } else { + // No more pending ack msg digests to process + p.pending = false; + logger.debug("No more queued gossip ack msg digests from node {}, ack_msg_digest={}, pending={}", + from, p.ack_msg_digest, p.pending); + return stop_iteration::yes; + } + }).handle_exception([this, from] (std::exception_ptr ep) { + if (_ack_handlers.count(from.addr)) { + ack_msg_pending& p = _ack_handlers[from.addr]; + p.pending = false; + p.ack_msg_digest = {}; + logger.warn("Failed to process gossip ack msg digests from node {}: {}", from, ep); + } + return make_exception_future(ep); + }); + }); + }); + } + }); +} + +future<> gossiper::do_send_ack2_msg(msg_addr from, utils::chunked_vector ack_msg_digest) { + return futurize_apply([this, from, ack_msg_digest = std::move(ack_msg_digest)] () mutable { /* Get the state required to send to this gossipee - construct GossipDigestAck2Message */ std::map delta_ep_state_map; - for (auto g_digest : g_digest_list) { + for (auto g_digest : ack_msg_digest) { inet_address addr = g_digest.get_endpoint(); auto local_ep_state_ptr = this->get_state_for_version_bigger_than(addr, g_digest.get_max_version()); if (local_ep_state_ptr) { @@ -338,10 +388,8 @@ future<> gossiper::handle_ack_msg(msg_addr id, gossip_digest_ack ack_msg) { } } gms::gossip_digest_ack2 ack2_msg(std::move(delta_ep_state_map)); - logger.trace("Sending a GossipDigestACK2 to {}", id); - return this->ms().send_gossip_digest_ack2(id, std::move(ack2_msg)).handle_exception([id] (auto ep) { - logger.warn("Fail to send GossipDigestACK2 to {}: {}", id, ep); - }); + logger.debug("Calling do_send_ack2_msg to node {}, ack_msg_digest={}, ack2_msg={}", from, ack_msg_digest, ack2_msg); + return this->ms().send_gossip_digest_ack2(from, std::move(ack2_msg)); }); } @@ -597,6 +645,7 @@ void gossiper::remove_endpoint(inet_address endpoint) { _live_endpoints_just_added.remove(endpoint); _unreachable_endpoints.erase(endpoint); _syn_handlers.erase(endpoint); + _ack_handlers.erase(endpoint); quarantine_endpoint(endpoint); logger.debug("removing endpoint {}", endpoint); } diff --git a/gms/gossiper.hh b/gms/gossiper.hh index 09e4b87a5a..6a038d35a8 100644 --- a/gms/gossiper.hh +++ b/gms/gossiper.hh @@ -51,6 +51,7 @@ #include "gms/endpoint_state.hh" #include "gms/feature.hh" #include "gms/gossip_digest_syn.hh" +#include "gms/gossip_digest.hh" #include "utils/loading_shared_values.hh" #include "utils/in.hh" #include "message/messaging_service_fwd.hh" @@ -85,6 +86,11 @@ struct syn_msg_pending { std::optional syn_msg; }; +struct ack_msg_pending { + bool pending = false; + std::optional> ack_msg_digest; +}; + /** * This module is responsible for Gossiping information for the local endpoint. This abstraction * maintains the list of live and dead endpoints. Periodically i.e. every 1 second this module @@ -116,6 +122,7 @@ private: future<> handle_echo_msg(); future<> handle_shutdown_msg(inet_address from); future<> do_send_ack_msg(msg_addr from, gossip_digest_syn syn_msg); + future<> do_send_ack2_msg(msg_addr from, utils::chunked_vector ack_msg_digest); static constexpr uint32_t _default_cpuid = 0; msg_addr get_msg_addr(inet_address to); void do_sort(utils::chunked_vector& g_digest_list); @@ -126,6 +133,7 @@ private: semaphore _callback_running{1}; semaphore _apply_state_locally_semaphore{100}; std::unordered_map _syn_handlers; + std::unordered_map _ack_handlers; public: sstring get_cluster_name(); sstring get_partitioner_name();