repair: Add repair_get_full_row_hashes_with_rpc_stream_process_op

It is the helper for the get_full_row_hashes rpc stream verb handler.
This commit is contained in:
Asias He
2019-06-25 10:25:13 +08:00
parent 39d5a9446e
commit 5f25969da3

View File

@@ -1713,6 +1713,41 @@ future<stop_iteration> repair_put_row_diff_with_rpc_stream_process_op(
}
}
future<stop_iteration> repair_get_full_row_hashes_with_rpc_stream_process_op(
gms::inet_address from,
uint32_t src_cpu_id,
uint32_t repair_meta_id,
rpc::sink<repair_hash_with_cmd> sink,
rpc::source<repair_stream_cmd> source,
bool &error,
std::optional<std::tuple<repair_stream_cmd>> status_opt) {
repair_stream_cmd status = std::get<0>(status_opt.value());
rlogger.trace("Got register_repair_get_full_row_hashes_with_rpc_stream from peer={}, status={}", from, int(status));
if (status == repair_stream_cmd::get_full_row_hashes) {
return smp::submit_to(src_cpu_id % smp::count, [from, repair_meta_id] {
auto rm = repair_meta::get_repair_meta(from, repair_meta_id);
return rm->get_full_row_hashes_handler().then([] (std::unordered_set<repair_hash> hashes) {
_metrics.tx_hashes_nr += hashes.size();
return hashes;
});
}).then([sink] (std::unordered_set<repair_hash> hashes) mutable {
return do_with(std::move(hashes), [sink] (std::unordered_set<repair_hash>& hashes) mutable {
return do_for_each(hashes, [sink] (const repair_hash& hash) mutable {
return sink(repair_hash_with_cmd{repair_stream_cmd::hash_data, hash});
}).then([sink] () mutable {
return sink(repair_hash_with_cmd{repair_stream_cmd::end_of_current_hash_set, repair_hash()});
});
});
}).then([sink] () mutable {
return sink.flush();
}).then([sink] {
return make_ready_future<stop_iteration>(stop_iteration::no);
});
} else {
return make_exception_future<stop_iteration>(std::runtime_error("Got unexpected repair_stream_cmd"));
}
}
future<> repair_init_messaging_service_handler(repair_service& rs, distributed<db::system_distributed_keyspace>& sys_dist_ks, distributed<db::view::view_update_generator>& view_update_generator) {
_sys_dist_ks = &sys_dist_ks;
_view_update_generator = &view_update_generator;