gossiper: coroutinize handle_syn_msg

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
This commit is contained in:
Benny Halevy
2023-05-01 08:26:18 +03:00
parent 63ab5f1ab3
commit 264f4daded

View File

@@ -168,24 +168,24 @@ future<> gossiper::handle_syn_msg(msg_addr from, gossip_digest_syn syn_msg) {
logger.trace("handle_syn_msg():from={},cluster_name:peer={},local={},group0_id:peer={},local={},partitioner_name:peer={},local={}",
from, syn_msg.cluster_id(), get_cluster_name(), syn_msg.group0_id(), get_group0_id(), syn_msg.partioner(), get_partitioner_name());
if (!this->is_enabled()) {
return make_ready_future<>();
co_return;
}
/* If the message is from a different cluster throw it away. */
if (syn_msg.cluster_id() != get_cluster_name()) {
logger.warn("ClusterName mismatch from {} {}!={}", from.addr, syn_msg.cluster_id(), get_cluster_name());
return make_ready_future<>();
co_return;
}
/* If the message is from a node with a different group0 id throw it away. */
if (syn_msg.group0_id() && get_group0_id() && syn_msg.group0_id() != get_group0_id()) {
logger.warn("Group0Id mismatch from {} {} != {}", from.addr, syn_msg.group0_id(), get_group0_id());
return make_ready_future<>();
co_return;
}
if (syn_msg.partioner() != "" && syn_msg.partioner() != get_partitioner_name()) {
logger.warn("Partitioner mismatch from {} {}!={}", from.addr, syn_msg.partioner(), get_partitioner_name());
return make_ready_future<>();
co_return;
}
syn_msg_pending& p = _syn_handlers[from.addr];
@@ -195,16 +195,17 @@ future<> gossiper::handle_syn_msg(msg_addr from, gossip_digest_syn syn_msg) {
// one only.
logger.debug("Queue gossip syn msg from node {}, syn_msg={}", from, syn_msg);
p.syn_msg = std::move(syn_msg);
return make_ready_future<>();
} else {
co_return;
}
// FIXME: indentation
// Process the syn message immediately
logger.debug("Process gossip syn msg from node {}, syn_msg={}", from, syn_msg);
p.pending = true;
return do_with(std::move(syn_msg), [this, from] (gossip_digest_syn& syn_msg) mutable {
return repeat([this, from, &syn_msg] {
return do_send_ack_msg(from, std::move(syn_msg)).then([this, from, &syn_msg] () mutable {
for (;;) {
try {
co_await do_send_ack_msg(from, std::move(syn_msg));
if (!_syn_handlers.contains(from.addr)) {
return stop_iteration::yes;
co_return;
}
syn_msg_pending& p = _syn_handlers[from.addr];
if (p.syn_msg) {
@@ -213,25 +214,24 @@ future<> gossiper::handle_syn_msg(msg_addr from, gossip_digest_syn syn_msg) {
from, p.syn_msg, p.pending);
syn_msg = std::move(p.syn_msg.value());
p.syn_msg = {};
return stop_iteration::no;
continue;
} else {
// No more pending syn msg to process
p.pending = false;
logger.debug("No more queued gossip syn msg from node {}, syn_msg={}, pending={}",
from, p.syn_msg, p.pending);
return stop_iteration::yes;
co_return;
}
}).handle_exception([this, from] (std::exception_ptr ep) {
} catch (...) {
auto ep = std::current_exception();
if (_syn_handlers.contains(from.addr)) {
syn_msg_pending& p = _syn_handlers[from.addr];
p.pending = false;
p.syn_msg = {};
}
logger.warn("Failed to process gossip syn msg from node {}: {}", from, ep);
return make_exception_future<stop_iteration>(ep);
});
});
});
throw;
}
}
}