repair: row_level: coroutinize repair_meta::get_full_row_hashes_with_rpc_stream()

The when_all_succeed() call is changed to the safer coroutine::when_all(),
which avoids the temporary futures.
This commit is contained in:
Avi Kivity
2024-09-04 15:03:00 +03:00
parent 572fbfde09
commit 21e01990ff

View File

@@ -1499,20 +1499,19 @@ public:
future<repair_hash_set>
get_full_row_hashes_with_rpc_stream(gms::inet_address remote_node, unsigned node_idx, shard_id dst_cpu_id) {
if (remote_node == myip()) {
return get_full_row_hashes_handler();
co_return co_await get_full_row_hashes_handler();
}
auto current_hashes = make_lw_shared<repair_hash_set>();
return _sink_source_for_get_full_row_hashes.get_sink_source(remote_node, node_idx, dst_cpu_id).then_unpack(
[this, current_hashes, remote_node, node_idx]
(rpc::sink<repair_stream_cmd>& sink, rpc::source<repair_hash_with_cmd>& source) mutable {
auto source_op = get_full_row_hashes_source_op(current_hashes, remote_node, node_idx, source);
auto sink_op = get_full_row_hashes_sink_op(sink);
return when_all_succeed(std::move(source_op), std::move(sink_op)).discard_result();
}).then([this, current_hashes] () mutable {
auto [sink, source] = co_await _sink_source_for_get_full_row_hashes.get_sink_source(remote_node, node_idx, dst_cpu_id);
co_await coroutine::all(
[&] { return get_full_row_hashes_source_op(current_hashes, remote_node, node_idx, source); },
[&] { return get_full_row_hashes_sink_op(sink); }
);
{
stats().rx_hashes_nr += current_hashes->size();
_metrics.rx_hashes_nr += current_hashes->size();
return std::move(*current_hashes);
});
co_return std::move(*current_hashes);
}
}
// RPC handler