From 6191fcb62eea937c94c19edced9e6571d80a44b2 Mon Sep 17 00:00:00 2001 From: Asias He Date: Wed, 15 Jul 2015 18:38:29 +0800 Subject: [PATCH] messaging_service: Add wrapper for STREAM_INIT_MESSAGE verb This patch serves as an example of how we can add wrappers for ms.send_message and ms.register_handler. When we convert all the users of them, we can make messaging_service.hh do not include rpc.hh. --- message/messaging_service.cc | 9 +++++++++ message/messaging_service.hh | 10 ++++++++++ streaming/stream_session.cc | 5 ++--- 3 files changed, 21 insertions(+), 3 deletions(-) diff --git a/message/messaging_service.cc b/message/messaging_service.cc index a2fecfc995..0a043a45fa 100644 --- a/message/messaging_service.cc +++ b/message/messaging_service.cc @@ -7,6 +7,7 @@ #include "gms/failure_detector.hh" #include "gms/gossiper.hh" #include "service/storage_service.hh" +#include "streaming/messages/stream_init_message.hh" namespace net { @@ -198,4 +199,12 @@ void messaging_service::remove_rpc_client(shard_id id) { _clients.erase(id); } +future messaging_service::send_stream_init_message(shard_id id, streaming::messages::stream_init_message&& msg, unsigned src_cpu_id) { + return send_message(messaging_verb::STREAM_INIT_MESSAGE, std::move(id), std::move(msg), std::move(src_cpu_id)); +} + +void messaging_service::register_stream_init_message(std::function (streaming::messages::stream_init_message msg, unsigned src_cpu_id)>&& func) { + register_handler(messaging_verb::STREAM_INIT_MESSAGE, std::move(func)); +} + } // namespace net diff --git a/message/messaging_service.hh b/message/messaging_service.hh index 2a3033d5d6..3d909c94ec 100644 --- a/message/messaging_service.hh +++ b/message/messaging_service.hh @@ -20,6 +20,11 @@ #include "db/serializer.hh" #include "mutation_query.hh" +// forward declarations +namespace streaming { namespace messages { + class stream_init_message; +}} + namespace net { /* All verb handler identifiers */ @@ -342,6 +347,11 @@ public: auto send_message_oneway(messaging_verb verb, shard_id id, MsgOut&&... msg) { return send_message(std::move(verb), std::move(id), std::forward(msg)...); } + + // Wrapper fro STREAM_INIT_MESSAGE verb + future send_stream_init_message(shard_id id, streaming::messages::stream_init_message&& msg, unsigned src_cpu_id); + void register_stream_init_message(std::function (streaming::messages::stream_init_message msg, unsigned src_cpu_id)>&& func); + private: // Return rpc::protocol::client for a shard which is a ip + cpuid pair. rpc_protocol::client& get_rpc_client(shard_id id); diff --git a/streaming/stream_session.cc b/streaming/stream_session.cc index 6b9283abab..4bcaab6c59 100644 --- a/streaming/stream_session.cc +++ b/streaming/stream_session.cc @@ -43,7 +43,7 @@ namespace streaming { thread_local logging::logger sslog("stream_session"); void stream_session::init_messaging_service_handler() { - ms().register_handler(messaging_verb::STREAM_INIT_MESSAGE, [] (messages::stream_init_message msg, unsigned src_cpu_id) { + ms().register_stream_init_message([] (messages::stream_init_message msg, unsigned src_cpu_id) { auto dst_cpu_id = engine().cpu_id(); sslog.debug("GOT STREAM_INIT_MESSAGE"); return smp::submit_to(dst_cpu_id, [msg = std::move(msg), src_cpu_id, dst_cpu_id] () mutable { @@ -174,8 +174,7 @@ future<> stream_session::initiate() { auto id = shard_id{this->peer, 0}; this->src_cpu_id = engine().cpu_id(); sslog.debug("SEND SENDSTREAM_INIT_MESSAGE to {}", id); - return ms().send_message(messaging_verb::STREAM_INIT_MESSAGE, - std::move(id), std::move(msg), this->src_cpu_id).then([this] (unsigned dst_cpu_id) { + return ms().send_stream_init_message(std::move(id), std::move(msg), this->src_cpu_id).then([this] (unsigned dst_cpu_id) { sslog.debug("GOT STREAM_INIT_MESSAGE Reply: dst_cpu_id={}", dst_cpu_id); this->dst_cpu_id = dst_cpu_id; });