repair: row_level: coroutinize repair_meta::get_full_row_hashes_source_op()

This commit is contained in:
Avi Kivity
2024-08-25 16:19:38 +03:00
parent f7d19e237d
commit 8e9ebd82fc

View File

@@ -1510,27 +1510,23 @@ private:
gms::inet_address remote_node,
unsigned node_idx,
rpc::source<repair_hash_with_cmd>& source) {
return repeat([this, current_hashes, remote_node, node_idx, &source] () mutable {
return source().then([this, current_hashes, remote_node, node_idx] (std::optional<std::tuple<repair_hash_with_cmd>> hash_cmd_opt) mutable {
if (hash_cmd_opt) {
while (std::optional<std::tuple<repair_hash_with_cmd>> hash_cmd_opt = co_await source()) {
repair_hash_with_cmd hash_cmd = std::get<0>(hash_cmd_opt.value());
rlogger.trace("get_full_row_hashes: Got repair_hash_with_cmd from peer={}, hash={}, cmd={}", remote_node, hash_cmd.hash, int(hash_cmd.cmd));
if (hash_cmd.cmd == repair_stream_cmd::hash_data) {
current_hashes->insert(hash_cmd.hash);
return make_ready_future<stop_iteration>(stop_iteration::no);
} else if (hash_cmd.cmd == repair_stream_cmd::end_of_current_hash_set) {
return make_ready_future<stop_iteration>(stop_iteration::yes);
co_return;
} else if (hash_cmd.cmd == repair_stream_cmd::error) {
throw std::runtime_error("get_full_row_hashes: Peer failed to process");
} else {
throw std::runtime_error("get_full_row_hashes: Got unexpected repair_stream_cmd");
}
} else {
}
{
_sink_source_for_get_full_row_hashes.mark_source_closed(node_idx);
throw std::runtime_error("get_full_row_hashes: Got unexpected end of stream");
}
});
});
}
}
future<> get_full_row_hashes_sink_op(rpc::sink<repair_stream_cmd>& sink) {