From 37b3de4ea08cdfd26f8eea3453ff87f6f544d604 Mon Sep 17 00:00:00 2001 From: Asias He Date: Tue, 25 Jun 2019 09:40:09 +0800 Subject: [PATCH] messaging_service: Add REPAIR_GET_FULL_ROW_HASHES_WITH_RPC_STREAM support It is used by row level repair. --- message/messaging_service.cc | 16 ++++++++++++++++ message/messaging_service.hh | 5 +++++ 2 files changed, 21 insertions(+) diff --git a/message/messaging_service.cc b/message/messaging_service.cc index fd8287ad6d..64eabab375 100644 --- a/message/messaging_service.cc +++ b/message/messaging_service.cc @@ -739,6 +739,22 @@ void messaging_service::register_repair_put_row_diff_with_rpc_stream(std::functi register_handler(this, messaging_verb::REPAIR_PUT_ROW_DIFF_WITH_RPC_STREAM, std::move(func)); } +// Wrapper for REPAIR_GET_FULL_ROW_HASHES_WITH_RPC_STREAM +future, rpc::source> +messaging_service::make_sink_and_source_for_repair_get_full_row_hashes_with_rpc_stream(uint32_t repair_meta_id, msg_addr id) { + auto verb = messaging_verb::REPAIR_GET_FULL_ROW_HASHES_WITH_RPC_STREAM; + auto rpc_client = get_rpc_client(verb, id); + return do_make_sink_source(verb, repair_meta_id, std::move(rpc_client), rpc()); +} + +rpc::sink messaging_service::make_sink_for_repair_get_full_row_hashes_with_rpc_stream(rpc::source& source) { + return source.make_sink(); +} + +void messaging_service::register_repair_get_full_row_hashes_with_rpc_stream(std::function> (const rpc::client_info& cinfo, uint32_t repair_meta_id, rpc::source source)>&& func) { + register_handler(this, messaging_verb::REPAIR_GET_FULL_ROW_HASHES_WITH_RPC_STREAM, std::move(func)); +} + // Send a message for verb template auto send_message(messaging_service* ms, messaging_verb verb, msg_addr id, MsgOut&&... msg) { diff --git a/message/messaging_service.hh b/message/messaging_service.hh index aafbd846eb..62fecf3c44 100644 --- a/message/messaging_service.hh +++ b/message/messaging_service.hh @@ -287,6 +287,11 @@ public: rpc::sink make_sink_for_repair_put_row_diff_with_rpc_stream(rpc::source& source); void register_repair_put_row_diff_with_rpc_stream(std::function> (const rpc::client_info& cinfo, uint32_t repair_meta_id, rpc::source source)>&& func); + // Wrapper for REPAIR_GET_FULL_ROW_HASHES_WITH_RPC_STREAM + future, rpc::source> make_sink_and_source_for_repair_get_full_row_hashes_with_rpc_stream(uint32_t repair_meta_id, msg_addr id); + rpc::sink make_sink_for_repair_get_full_row_hashes_with_rpc_stream(rpc::source& source); + void register_repair_get_full_row_hashes_with_rpc_stream(std::function> (const rpc::client_info& cinfo, uint32_t repair_meta_id, rpc::source source)>&& func); + void register_stream_mutation_done(std::function (const rpc::client_info& cinfo, UUID plan_id, dht::token_range_vector ranges, UUID cf_id, unsigned dst_cpu_id)>&& func); future<> send_stream_mutation_done(msg_addr id, UUID plan_id, dht::token_range_vector ranges, UUID cf_id, unsigned dst_cpu_id);