messaging_service: Add wrapper for STREAM_INIT_MESSAGE verb

This patch serves as an example of how we can add wrappers for
ms.send_message and ms.register_handler.

When we convert all the users of them, we can make messaging_service.hh
do not include rpc.hh.
This commit is contained in:
Asias He
2015-07-15 18:38:29 +08:00
committed by Avi Kivity
parent cce2c648c5
commit 6191fcb62e
3 changed files with 21 additions and 3 deletions

View File

@@ -7,6 +7,7 @@
#include "gms/failure_detector.hh"
#include "gms/gossiper.hh"
#include "service/storage_service.hh"
#include "streaming/messages/stream_init_message.hh"
namespace net {
@@ -198,4 +199,12 @@ void messaging_service::remove_rpc_client(shard_id id) {
_clients.erase(id);
}
future<unsigned> messaging_service::send_stream_init_message(shard_id id, streaming::messages::stream_init_message&& msg, unsigned src_cpu_id) {
return send_message<unsigned>(messaging_verb::STREAM_INIT_MESSAGE, std::move(id), std::move(msg), std::move(src_cpu_id));
}
void messaging_service::register_stream_init_message(std::function<future<unsigned> (streaming::messages::stream_init_message msg, unsigned src_cpu_id)>&& func) {
register_handler(messaging_verb::STREAM_INIT_MESSAGE, std::move(func));
}
} // namespace net

View File

@@ -20,6 +20,11 @@
#include "db/serializer.hh"
#include "mutation_query.hh"
// forward declarations
namespace streaming { namespace messages {
class stream_init_message;
}}
namespace net {
/* All verb handler identifiers */
@@ -342,6 +347,11 @@ public:
auto send_message_oneway(messaging_verb verb, shard_id id, MsgOut&&... msg) {
return send_message<rpc::no_wait_type>(std::move(verb), std::move(id), std::forward<MsgOut>(msg)...);
}
// Wrapper fro STREAM_INIT_MESSAGE verb
future<unsigned> send_stream_init_message(shard_id id, streaming::messages::stream_init_message&& msg, unsigned src_cpu_id);
void register_stream_init_message(std::function<future<unsigned> (streaming::messages::stream_init_message msg, unsigned src_cpu_id)>&& func);
private:
// Return rpc::protocol::client for a shard which is a ip + cpuid pair.
rpc_protocol::client& get_rpc_client(shard_id id);

View File

@@ -43,7 +43,7 @@ namespace streaming {
thread_local logging::logger sslog("stream_session");
void stream_session::init_messaging_service_handler() {
ms().register_handler(messaging_verb::STREAM_INIT_MESSAGE, [] (messages::stream_init_message msg, unsigned src_cpu_id) {
ms().register_stream_init_message([] (messages::stream_init_message msg, unsigned src_cpu_id) {
auto dst_cpu_id = engine().cpu_id();
sslog.debug("GOT STREAM_INIT_MESSAGE");
return smp::submit_to(dst_cpu_id, [msg = std::move(msg), src_cpu_id, dst_cpu_id] () mutable {
@@ -174,8 +174,7 @@ future<> stream_session::initiate() {
auto id = shard_id{this->peer, 0};
this->src_cpu_id = engine().cpu_id();
sslog.debug("SEND SENDSTREAM_INIT_MESSAGE to {}", id);
return ms().send_message<unsigned>(messaging_verb::STREAM_INIT_MESSAGE,
std::move(id), std::move(msg), this->src_cpu_id).then([this] (unsigned dst_cpu_id) {
return ms().send_stream_init_message(std::move(id), std::move(msg), this->src_cpu_id).then([this] (unsigned dst_cpu_id) {
sslog.debug("GOT STREAM_INIT_MESSAGE Reply: dst_cpu_id={}", dst_cpu_id);
this->dst_cpu_id = dst_cpu_id;
});