repair: row_level: coroutinize repair_get_full_row_hashes_with_rpc_stream_process()

This commit is contained in:
Avi Kivity
2024-08-25 13:13:39 +03:00
parent 01a578f608
commit 61bbf452c6

View File

@@ -2174,29 +2174,32 @@ static future<stop_iteration> repair_get_full_row_hashes_with_rpc_stream_process
repair_stream_cmd status = std::get<0>(status_opt.value());
rlogger.trace("Got register_repair_get_full_row_hashes_with_rpc_stream from peer={}, status={}", from, int(status));
if (status == repair_stream_cmd::get_full_row_hashes) {
return repair.invoke_on(dst_cpu_id, [from, repair_meta_id] (repair_service& local_repair) {
repair_hash_set hashes = co_await repair.invoke_on(dst_cpu_id, [from, repair_meta_id] (repair_service& local_repair) -> future<repair_hash_set> {
auto rm = local_repair.get_repair_meta(from, repair_meta_id);
rm->set_repair_state_for_local_node(repair_state::get_full_row_hashes_started);
return rm->get_full_row_hashes_handler().then([rm] (repair_hash_set hashes) {
repair_hash_set hashes = co_await rm->get_full_row_hashes_handler();
{
rm->set_repair_state_for_local_node(repair_state::get_full_row_hashes_started);
_metrics.tx_hashes_nr += hashes.size();
return hashes;
});
}).then([sink] (repair_hash_set hashes) mutable {
return do_with(std::move(hashes), [sink] (repair_hash_set& hashes) mutable {
return do_for_each(hashes, [sink] (const repair_hash& hash) mutable {
return sink(repair_hash_with_cmd{repair_stream_cmd::hash_data, hash});
}).then([sink] () mutable {
return sink(repair_hash_with_cmd{repair_stream_cmd::end_of_current_hash_set, repair_hash()});
});
});
}).then([sink] () mutable {
return sink.flush();
}).then([sink] {
return make_ready_future<stop_iteration>(stop_iteration::no);
co_return hashes;
}
});
{
for (const repair_hash& hash : hashes) {
co_await sink(repair_hash_with_cmd{repair_stream_cmd::hash_data, hash});
}
{
co_await sink(repair_hash_with_cmd{repair_stream_cmd::end_of_current_hash_set, repair_hash()});
}
}
{
co_await sink.flush();
}
{
co_return stop_iteration::no;
}
} else {
return make_exception_future<stop_iteration>(std::runtime_error("Got unexpected repair_stream_cmd"));
throw std::runtime_error("Got unexpected repair_stream_cmd");
}
}