diff --git a/repair/row_level.cc b/repair/row_level.cc index a7f864d084..0ddaebdcbf 100644 --- a/repair/row_level.cc +++ b/repair/row_level.cc @@ -56,6 +56,8 @@ struct shard_config { sstring partitioner_name; }; +static bool inject_rpc_stream_error = false; + distributed* _sys_dist_ks; distributed* _view_update_generator; @@ -1426,6 +1428,44 @@ public: return make_ready_future<>(); } +private: + future<> get_row_diff_source_op( + update_peer_row_hash_sets update_hash_set, + gms::inet_address remote_node, + unsigned node_idx, + rpc::sink& sink, + rpc::source& source) { + auto current_rows = make_lw_shared(); + return repeat([this, current_rows, update_hash_set, remote_node, node_idx, &sink, &source] () mutable { + return source().then([this, current_rows, update_hash_set, remote_node, node_idx] (std::optional> row_opt) mutable { + if (row_opt) { + if (inject_rpc_stream_error) { + throw std::runtime_error("get_row_diff: Inject sender error in source loop"); + } + auto row = std::move(std::get<0>(row_opt.value())); + if (row.cmd == repair_stream_cmd::row_data) { + rlogger.trace("get_row_diff: Got repair_row_on_wire with data"); + current_rows->push_back(std::move(row.row)); + return make_ready_future(stop_iteration::no); + } else if (row.cmd == repair_stream_cmd::end_of_current_rows) { + rlogger.trace("get_row_diff: Got repair_row_on_wire with nullopt"); + return apply_rows(std::move(*current_rows), remote_node, update_working_row_buf::yes, update_hash_set, node_idx).then([current_rows] { + current_rows->clear(); + return make_ready_future(stop_iteration::yes); + }); + } else if (row.cmd == repair_stream_cmd::error) { + throw std::runtime_error("get_row_diff: Peer failed to process"); + } else { + throw std::runtime_error("get_row_diff: Got unexpected repair_stream_cmd"); + } + } else { + _sink_source_for_get_row_diff.mark_source_closed(node_idx); + throw std::runtime_error("get_row_diff: Got unexpected end of stream"); + } + }); + }); + } +public: // RPC handler future get_row_diff_handler(const std::unordered_set& set_diff, needs_all_rows_t needs_all_rows) { return with_gate(_gate, [this, &set_diff, needs_all_rows] {