From 21e01990ff2eb6d83f1cd655e3cd985635d1e4cd Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Wed, 4 Sep 2024 15:03:00 +0300 Subject: [PATCH] repair: row_level: coroutinize repair_meta::get_full_row_hashes_with_rpc_stream() The when_all_succeed() call is changed to the safer coroutine::when_all(), which avoids the temporary futures. --- repair/row_level.cc | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/repair/row_level.cc b/repair/row_level.cc index fbb799175a..4a339bbc80 100644 --- a/repair/row_level.cc +++ b/repair/row_level.cc @@ -1499,20 +1499,19 @@ public: future get_full_row_hashes_with_rpc_stream(gms::inet_address remote_node, unsigned node_idx, shard_id dst_cpu_id) { if (remote_node == myip()) { - return get_full_row_hashes_handler(); + co_return co_await get_full_row_hashes_handler(); } auto current_hashes = make_lw_shared(); - return _sink_source_for_get_full_row_hashes.get_sink_source(remote_node, node_idx, dst_cpu_id).then_unpack( - [this, current_hashes, remote_node, node_idx] - (rpc::sink& sink, rpc::source& source) mutable { - auto source_op = get_full_row_hashes_source_op(current_hashes, remote_node, node_idx, source); - auto sink_op = get_full_row_hashes_sink_op(sink); - return when_all_succeed(std::move(source_op), std::move(sink_op)).discard_result(); - }).then([this, current_hashes] () mutable { + auto [sink, source] = co_await _sink_source_for_get_full_row_hashes.get_sink_source(remote_node, node_idx, dst_cpu_id); + co_await coroutine::all( + [&] { return get_full_row_hashes_source_op(current_hashes, remote_node, node_idx, source); }, + [&] { return get_full_row_hashes_sink_op(sink); } + ); + { stats().rx_hashes_nr += current_hashes->size(); _metrics.rx_hashes_nr += current_hashes->size(); - return std::move(*current_hashes); - }); + co_return std::move(*current_hashes); + } } // RPC handler