From 5f25969da33e4cec5aa205d6724ec17be7debcbc Mon Sep 17 00:00:00 2001 From: Asias He Date: Tue, 25 Jun 2019 10:25:13 +0800 Subject: [PATCH] repair: Add repair_get_full_row_hashes_with_rpc_stream_process_op It is the helper for the get_full_row_hashes rpc stream verb handler. --- repair/row_level.cc | 35 +++++++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/repair/row_level.cc b/repair/row_level.cc index 4189190c00..3326547876 100644 --- a/repair/row_level.cc +++ b/repair/row_level.cc @@ -1713,6 +1713,41 @@ future repair_put_row_diff_with_rpc_stream_process_op( } } +future repair_get_full_row_hashes_with_rpc_stream_process_op( + gms::inet_address from, + uint32_t src_cpu_id, + uint32_t repair_meta_id, + rpc::sink sink, + rpc::source source, + bool &error, + std::optional> status_opt) { + repair_stream_cmd status = std::get<0>(status_opt.value()); + rlogger.trace("Got register_repair_get_full_row_hashes_with_rpc_stream from peer={}, status={}", from, int(status)); + if (status == repair_stream_cmd::get_full_row_hashes) { + return smp::submit_to(src_cpu_id % smp::count, [from, repair_meta_id] { + auto rm = repair_meta::get_repair_meta(from, repair_meta_id); + return rm->get_full_row_hashes_handler().then([] (std::unordered_set hashes) { + _metrics.tx_hashes_nr += hashes.size(); + return hashes; + }); + }).then([sink] (std::unordered_set hashes) mutable { + return do_with(std::move(hashes), [sink] (std::unordered_set& hashes) mutable { + return do_for_each(hashes, [sink] (const repair_hash& hash) mutable { + return sink(repair_hash_with_cmd{repair_stream_cmd::hash_data, hash}); + }).then([sink] () mutable { + return sink(repair_hash_with_cmd{repair_stream_cmd::end_of_current_hash_set, repair_hash()}); + }); + }); + }).then([sink] () mutable { + return sink.flush(); + }).then([sink] { + return make_ready_future(stop_iteration::no); + }); + } else { + return make_exception_future(std::runtime_error("Got unexpected repair_stream_cmd")); + } +} + future<> repair_init_messaging_service_handler(repair_service& rs, distributed& sys_dist_ks, distributed& view_update_generator) { _sys_dist_ks = &sys_dist_ks; _view_update_generator = &view_update_generator;