repair: Add get_row_diff_source_op

It is a helper that works on the source() of the
get_row_diff rpc stream verb.
This commit is contained in:
Asias He
2019-06-25 10:13:37 +08:00
committed by ME
parent 473bd7599c
commit a1e19514f9

View File

@@ -56,6 +56,8 @@ struct shard_config {
sstring partitioner_name;
};
static bool inject_rpc_stream_error = false;
distributed<db::system_distributed_keyspace>* _sys_dist_ks;
distributed<db::view::view_update_generator>* _view_update_generator;
@@ -1426,6 +1428,44 @@ public:
return make_ready_future<>();
}
private:
future<> get_row_diff_source_op(
update_peer_row_hash_sets update_hash_set,
gms::inet_address remote_node,
unsigned node_idx,
rpc::sink<repair_hash_with_cmd>& sink,
rpc::source<repair_row_on_wire_with_cmd>& source) {
auto current_rows = make_lw_shared<repair_rows_on_wire>();
return repeat([this, current_rows, update_hash_set, remote_node, node_idx, &sink, &source] () mutable {
return source().then([this, current_rows, update_hash_set, remote_node, node_idx] (std::optional<std::tuple<repair_row_on_wire_with_cmd>> row_opt) mutable {
if (row_opt) {
if (inject_rpc_stream_error) {
throw std::runtime_error("get_row_diff: Inject sender error in source loop");
}
auto row = std::move(std::get<0>(row_opt.value()));
if (row.cmd == repair_stream_cmd::row_data) {
rlogger.trace("get_row_diff: Got repair_row_on_wire with data");
current_rows->push_back(std::move(row.row));
return make_ready_future<stop_iteration>(stop_iteration::no);
} else if (row.cmd == repair_stream_cmd::end_of_current_rows) {
rlogger.trace("get_row_diff: Got repair_row_on_wire with nullopt");
return apply_rows(std::move(*current_rows), remote_node, update_working_row_buf::yes, update_hash_set, node_idx).then([current_rows] {
current_rows->clear();
return make_ready_future<stop_iteration>(stop_iteration::yes);
});
} else if (row.cmd == repair_stream_cmd::error) {
throw std::runtime_error("get_row_diff: Peer failed to process");
} else {
throw std::runtime_error("get_row_diff: Got unexpected repair_stream_cmd");
}
} else {
_sink_source_for_get_row_diff.mark_source_closed(node_idx);
throw std::runtime_error("get_row_diff: Got unexpected end of stream");
}
});
});
}
public:
// RPC handler
future<repair_rows_on_wire> get_row_diff_handler(const std::unordered_set<repair_hash>& set_diff, needs_all_rows_t needs_all_rows) {
return with_gate(_gate, [this, &set_diff, needs_all_rows] {