diff --git a/repair/row_level.cc b/repair/row_level.cc index c7c0c656ef..ad31bc3596 100644 --- a/repair/row_level.cc +++ b/repair/row_level.cc @@ -1639,6 +1639,49 @@ public: } }; +future repair_get_row_diff_with_rpc_stream_process_op( + gms::inet_address from, + uint32_t src_cpu_id, + uint32_t repair_meta_id, + rpc::sink sink, + rpc::source source, + bool &error, + std::unordered_set& current_set_diff, + std::optional> hash_cmd_opt) { + repair_hash_with_cmd hash_cmd = std::get<0>(hash_cmd_opt.value()); + rlogger.trace("Got repair_hash_with_cmd from peer={}, hash={}, cmd={}", from, hash_cmd.hash, int(hash_cmd.cmd)); + if (hash_cmd.cmd == repair_stream_cmd::hash_data) { + current_set_diff.insert(hash_cmd.hash); + return make_ready_future(stop_iteration::no); + } else if (hash_cmd.cmd == repair_stream_cmd::end_of_current_hash_set || hash_cmd.cmd == repair_stream_cmd::needs_all_rows) { + if (inject_rpc_stream_error) { + return make_exception_future(std::runtime_error("get_row_diff_with_rpc_stream: Inject error in handler loop")); + } + bool needs_all_rows = hash_cmd.cmd == repair_stream_cmd::needs_all_rows; + return smp::submit_to(src_cpu_id % smp::count, [from, repair_meta_id, needs_all_rows, set_diff = std::move(current_set_diff)] { + auto rm = repair_meta::get_repair_meta(from, repair_meta_id); + return rm->get_row_diff_handler(set_diff, repair_meta::needs_all_rows_t(needs_all_rows)); + }).then([sink] (repair_rows_on_wire rows_on_wire) mutable { + if (rows_on_wire.empty()) { + return sink(repair_row_on_wire_with_cmd{repair_stream_cmd::end_of_current_rows, repair_row_on_wire()}); + } + return do_with(std::move(rows_on_wire), [sink] (repair_rows_on_wire& rows_on_wire) mutable { + return do_for_each(rows_on_wire, [sink] (repair_row_on_wire& row) mutable { + return sink(repair_row_on_wire_with_cmd{repair_stream_cmd::row_data, std::move(row)}); + }).then([sink] () mutable { + return sink(repair_row_on_wire_with_cmd{repair_stream_cmd::end_of_current_rows, repair_row_on_wire()}); + }); + }); + }).then([sink] () mutable { + return sink.flush(); + }).then([sink] { + return make_ready_future(stop_iteration::no); + }); + } else { + return make_exception_future(std::runtime_error("Got unexpected repair_stream_cmd")); + } +} + future<> repair_init_messaging_service_handler(repair_service& rs, distributed& sys_dist_ks, distributed& view_update_generator) { _sys_dist_ks = &sys_dist_ks; _view_update_generator = &view_update_generator;