repair: Add repair_get_row_diff_with_rpc_stream_process_op

It is the helper for the get_row_diff rpc stream verb handler.
This commit is contained in:
Asias He
2019-06-25 10:22:20 +08:00
parent b1188f299e
commit 049e793fe5

View File

@@ -1639,6 +1639,49 @@ public:
}
};
future<stop_iteration> repair_get_row_diff_with_rpc_stream_process_op(
gms::inet_address from,
uint32_t src_cpu_id,
uint32_t repair_meta_id,
rpc::sink<repair_row_on_wire_with_cmd> sink,
rpc::source<repair_hash_with_cmd> source,
bool &error,
std::unordered_set<repair_hash>& current_set_diff,
std::optional<std::tuple<repair_hash_with_cmd>> hash_cmd_opt) {
repair_hash_with_cmd hash_cmd = std::get<0>(hash_cmd_opt.value());
rlogger.trace("Got repair_hash_with_cmd from peer={}, hash={}, cmd={}", from, hash_cmd.hash, int(hash_cmd.cmd));
if (hash_cmd.cmd == repair_stream_cmd::hash_data) {
current_set_diff.insert(hash_cmd.hash);
return make_ready_future<stop_iteration>(stop_iteration::no);
} else if (hash_cmd.cmd == repair_stream_cmd::end_of_current_hash_set || hash_cmd.cmd == repair_stream_cmd::needs_all_rows) {
if (inject_rpc_stream_error) {
return make_exception_future<stop_iteration>(std::runtime_error("get_row_diff_with_rpc_stream: Inject error in handler loop"));
}
bool needs_all_rows = hash_cmd.cmd == repair_stream_cmd::needs_all_rows;
return smp::submit_to(src_cpu_id % smp::count, [from, repair_meta_id, needs_all_rows, set_diff = std::move(current_set_diff)] {
auto rm = repair_meta::get_repair_meta(from, repair_meta_id);
return rm->get_row_diff_handler(set_diff, repair_meta::needs_all_rows_t(needs_all_rows));
}).then([sink] (repair_rows_on_wire rows_on_wire) mutable {
if (rows_on_wire.empty()) {
return sink(repair_row_on_wire_with_cmd{repair_stream_cmd::end_of_current_rows, repair_row_on_wire()});
}
return do_with(std::move(rows_on_wire), [sink] (repair_rows_on_wire& rows_on_wire) mutable {
return do_for_each(rows_on_wire, [sink] (repair_row_on_wire& row) mutable {
return sink(repair_row_on_wire_with_cmd{repair_stream_cmd::row_data, std::move(row)});
}).then([sink] () mutable {
return sink(repair_row_on_wire_with_cmd{repair_stream_cmd::end_of_current_rows, repair_row_on_wire()});
});
});
}).then([sink] () mutable {
return sink.flush();
}).then([sink] {
return make_ready_future<stop_iteration>(stop_iteration::no);
});
} else {
return make_exception_future<stop_iteration>(std::runtime_error("Got unexpected repair_stream_cmd"));
}
}
future<> repair_init_messaging_service_handler(repair_service& rs, distributed<db::system_distributed_keyspace>& sys_dist_ks, distributed<db::view::view_update_generator>& view_update_generator) {
_sys_dist_ks = &sys_dist_ks;
_view_update_generator = &view_update_generator;