From 264f4dadedd7e2e91dc669d8036d529bfd9b5abe Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Mon, 1 May 2023 08:26:18 +0300 Subject: [PATCH] gossiper: coroutinize handle_syn_msg Signed-off-by: Benny Halevy --- gms/gossiper.cc | 34 +++++++++++++++++----------------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/gms/gossiper.cc b/gms/gossiper.cc index b4c675e2ea..8339823a00 100644 --- a/gms/gossiper.cc +++ b/gms/gossiper.cc @@ -168,24 +168,24 @@ future<> gossiper::handle_syn_msg(msg_addr from, gossip_digest_syn syn_msg) { logger.trace("handle_syn_msg():from={},cluster_name:peer={},local={},group0_id:peer={},local={},partitioner_name:peer={},local={}", from, syn_msg.cluster_id(), get_cluster_name(), syn_msg.group0_id(), get_group0_id(), syn_msg.partioner(), get_partitioner_name()); if (!this->is_enabled()) { - return make_ready_future<>(); + co_return; } /* If the message is from a different cluster throw it away. */ if (syn_msg.cluster_id() != get_cluster_name()) { logger.warn("ClusterName mismatch from {} {}!={}", from.addr, syn_msg.cluster_id(), get_cluster_name()); - return make_ready_future<>(); + co_return; } /* If the message is from a node with a different group0 id throw it away. */ if (syn_msg.group0_id() && get_group0_id() && syn_msg.group0_id() != get_group0_id()) { logger.warn("Group0Id mismatch from {} {} != {}", from.addr, syn_msg.group0_id(), get_group0_id()); - return make_ready_future<>(); + co_return; } if (syn_msg.partioner() != "" && syn_msg.partioner() != get_partitioner_name()) { logger.warn("Partitioner mismatch from {} {}!={}", from.addr, syn_msg.partioner(), get_partitioner_name()); - return make_ready_future<>(); + co_return; } syn_msg_pending& p = _syn_handlers[from.addr]; @@ -195,16 +195,17 @@ future<> gossiper::handle_syn_msg(msg_addr from, gossip_digest_syn syn_msg) { // one only. logger.debug("Queue gossip syn msg from node {}, syn_msg={}", from, syn_msg); p.syn_msg = std::move(syn_msg); - return make_ready_future<>(); - } else { + co_return; + } + // FIXME: indentation // Process the syn message immediately logger.debug("Process gossip syn msg from node {}, syn_msg={}", from, syn_msg); p.pending = true; - return do_with(std::move(syn_msg), [this, from] (gossip_digest_syn& syn_msg) mutable { - return repeat([this, from, &syn_msg] { - return do_send_ack_msg(from, std::move(syn_msg)).then([this, from, &syn_msg] () mutable { + for (;;) { + try { + co_await do_send_ack_msg(from, std::move(syn_msg)); if (!_syn_handlers.contains(from.addr)) { - return stop_iteration::yes; + co_return; } syn_msg_pending& p = _syn_handlers[from.addr]; if (p.syn_msg) { @@ -213,25 +214,24 @@ future<> gossiper::handle_syn_msg(msg_addr from, gossip_digest_syn syn_msg) { from, p.syn_msg, p.pending); syn_msg = std::move(p.syn_msg.value()); p.syn_msg = {}; - return stop_iteration::no; + continue; } else { // No more pending syn msg to process p.pending = false; logger.debug("No more queued gossip syn msg from node {}, syn_msg={}, pending={}", from, p.syn_msg, p.pending); - return stop_iteration::yes; + co_return; } - }).handle_exception([this, from] (std::exception_ptr ep) { + } catch (...) { + auto ep = std::current_exception(); if (_syn_handlers.contains(from.addr)) { syn_msg_pending& p = _syn_handlers[from.addr]; p.pending = false; p.syn_msg = {}; } logger.warn("Failed to process gossip syn msg from node {}: {}", from, ep); - return make_exception_future(ep); - }); - }); - }); + throw; + } } }