From 61bbf452c6bab5f088eceafd4920eebecafaea83 Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Sun, 25 Aug 2024 13:13:39 +0300 Subject: [PATCH] repair: row_level: coroutinize repair_get_full_row_hashes_with_rpc_stream_process() --- repair/row_level.cc | 37 ++++++++++++++++++++----------------- 1 file changed, 20 insertions(+), 17 deletions(-) diff --git a/repair/row_level.cc b/repair/row_level.cc index a626369c9d..84b269f286 100644 --- a/repair/row_level.cc +++ b/repair/row_level.cc @@ -2174,29 +2174,32 @@ static future repair_get_full_row_hashes_with_rpc_stream_process repair_stream_cmd status = std::get<0>(status_opt.value()); rlogger.trace("Got register_repair_get_full_row_hashes_with_rpc_stream from peer={}, status={}", from, int(status)); if (status == repair_stream_cmd::get_full_row_hashes) { - return repair.invoke_on(dst_cpu_id, [from, repair_meta_id] (repair_service& local_repair) { + repair_hash_set hashes = co_await repair.invoke_on(dst_cpu_id, [from, repair_meta_id] (repair_service& local_repair) -> future { auto rm = local_repair.get_repair_meta(from, repair_meta_id); rm->set_repair_state_for_local_node(repair_state::get_full_row_hashes_started); - return rm->get_full_row_hashes_handler().then([rm] (repair_hash_set hashes) { + repair_hash_set hashes = co_await rm->get_full_row_hashes_handler(); + { rm->set_repair_state_for_local_node(repair_state::get_full_row_hashes_started); _metrics.tx_hashes_nr += hashes.size(); - return hashes; - }); - }).then([sink] (repair_hash_set hashes) mutable { - return do_with(std::move(hashes), [sink] (repair_hash_set& hashes) mutable { - return do_for_each(hashes, [sink] (const repair_hash& hash) mutable { - return sink(repair_hash_with_cmd{repair_stream_cmd::hash_data, hash}); - }).then([sink] () mutable { - return sink(repair_hash_with_cmd{repair_stream_cmd::end_of_current_hash_set, repair_hash()}); - }); - }); - }).then([sink] () mutable { - return sink.flush(); - }).then([sink] { - return make_ready_future(stop_iteration::no); + co_return hashes; + } }); + { + for (const repair_hash& hash : hashes) { + co_await sink(repair_hash_with_cmd{repair_stream_cmd::hash_data, hash}); + } + { + co_await sink(repair_hash_with_cmd{repair_stream_cmd::end_of_current_hash_set, repair_hash()}); + } + } + { + co_await sink.flush(); + } + { + co_return stop_iteration::no; + } } else { - return make_exception_future(std::runtime_error("Got unexpected repair_stream_cmd")); + throw std::runtime_error("Got unexpected repair_stream_cmd"); } }