From 8e9ebd82fc8404e8096639a3c2b0bf0fe09660ea Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Sun, 25 Aug 2024 16:19:38 +0300 Subject: [PATCH] repair: row_level: coroutinize repair_meta::get_full_row_hashes_source_op() --- repair/row_level.cc | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/repair/row_level.cc b/repair/row_level.cc index 8b78bd4079..bae9401b20 100644 --- a/repair/row_level.cc +++ b/repair/row_level.cc @@ -1510,27 +1510,23 @@ private: gms::inet_address remote_node, unsigned node_idx, rpc::source& source) { - return repeat([this, current_hashes, remote_node, node_idx, &source] () mutable { - return source().then([this, current_hashes, remote_node, node_idx] (std::optional> hash_cmd_opt) mutable { - if (hash_cmd_opt) { + while (std::optional> hash_cmd_opt = co_await source()) { repair_hash_with_cmd hash_cmd = std::get<0>(hash_cmd_opt.value()); rlogger.trace("get_full_row_hashes: Got repair_hash_with_cmd from peer={}, hash={}, cmd={}", remote_node, hash_cmd.hash, int(hash_cmd.cmd)); if (hash_cmd.cmd == repair_stream_cmd::hash_data) { current_hashes->insert(hash_cmd.hash); - return make_ready_future(stop_iteration::no); } else if (hash_cmd.cmd == repair_stream_cmd::end_of_current_hash_set) { - return make_ready_future(stop_iteration::yes); + co_return; } else if (hash_cmd.cmd == repair_stream_cmd::error) { throw std::runtime_error("get_full_row_hashes: Peer failed to process"); } else { throw std::runtime_error("get_full_row_hashes: Got unexpected repair_stream_cmd"); } - } else { + } + { _sink_source_for_get_full_row_hashes.mark_source_closed(node_idx); throw std::runtime_error("get_full_row_hashes: Got unexpected end of stream"); - } - }); - }); + } } future<> get_full_row_hashes_sink_op(rpc::sink& sink) {