diff --git a/repair/row_level.cc b/repair/row_level.cc index fbb799175a..4a339bbc80 100644 --- a/repair/row_level.cc +++ b/repair/row_level.cc @@ -1499,20 +1499,19 @@ public: future get_full_row_hashes_with_rpc_stream(gms::inet_address remote_node, unsigned node_idx, shard_id dst_cpu_id) { if (remote_node == myip()) { - return get_full_row_hashes_handler(); + co_return co_await get_full_row_hashes_handler(); } auto current_hashes = make_lw_shared(); - return _sink_source_for_get_full_row_hashes.get_sink_source(remote_node, node_idx, dst_cpu_id).then_unpack( - [this, current_hashes, remote_node, node_idx] - (rpc::sink& sink, rpc::source& source) mutable { - auto source_op = get_full_row_hashes_source_op(current_hashes, remote_node, node_idx, source); - auto sink_op = get_full_row_hashes_sink_op(sink); - return when_all_succeed(std::move(source_op), std::move(sink_op)).discard_result(); - }).then([this, current_hashes] () mutable { + auto [sink, source] = co_await _sink_source_for_get_full_row_hashes.get_sink_source(remote_node, node_idx, dst_cpu_id); + co_await coroutine::all( + [&] { return get_full_row_hashes_source_op(current_hashes, remote_node, node_idx, source); }, + [&] { return get_full_row_hashes_sink_op(sink); } + ); + { stats().rx_hashes_nr += current_hashes->size(); _metrics.rx_hashes_nr += current_hashes->size(); - return std::move(*current_hashes); - }); + co_return std::move(*current_hashes); + } } // RPC handler