message: do not erase client's rpc call type

Currently we cast rpc call lambda to std::function (to put all lambdas
in one container) which removes its template properties and gives it
strictly typed interface. This is inconvenient since interface may either
receive an object by value (which requires copy during invocation),
or by reference (which require lifetime management), but not both. This
patch changes the implementation to no store lambda in one central
place, but create it with rpc::make_client() call when used. This way
template properties are preserved and send_message() arguments are
forwarded to rpc call with original types.

Reviewed-by: Asias He <asias@cloudius-systems.com>
This commit is contained in:
Gleb Natapov
2015-05-07 11:39:25 +03:00
committed by Avi Kivity
parent e17f88e948
commit 1a8c4b75f5

View File

@@ -232,7 +232,6 @@ private:
rpc::protocol<serializer, messaging_verb> _rpc;
rpc::protocol<serializer, messaging_verb>::server _server;
std::unordered_map<shard_id, shard_info, shard_id::hash> _clients;
std::unordered_map<messaging_verb, std::unique_ptr<handler_base>> _handlers;
public:
messaging_service(gms::inet_address ip = gms::inet_address("0.0.0.0"))
: _listen_address(ip)
@@ -254,48 +253,24 @@ public:
static auto no_wait() {
return rpc::no_wait;
}
private:
template <typename Ret, typename Tuple>
struct tuple_to_handler_type;
template <typename Ret, typename... Args>
struct tuple_to_handler_type<Ret, std::tuple<Args...>> {
using type = handler<Ret(rpc::protocol<serializer, messaging_verb>::client&, Args...)>;
};
template <typename Ret, typename Tuple>
struct tuple_to_handler_type_oneway;
template <typename Ret, typename... Args>
struct tuple_to_handler_type_oneway<Ret, std::tuple<Args...>> {
using type = handler<future<>(rpc::protocol<serializer, messaging_verb>::client&, Args...)>;
};
public:
// Register a handler (a callback lambda) for verb
template <typename Func>
void register_handler(messaging_verb verb, Func&& func) {
auto rpc_handler = _rpc.register_handler(verb, std::move(func));
using Ret = typename function_traits<Func>::return_type;
using ArgsTuple = typename function_traits<Func>::args_as_tuple;
using handler_type = typename tuple_to_handler_type<Ret, ArgsTuple>::type;
_handlers.emplace(verb, std::make_unique<handler_type>(std::move(rpc_handler)));
_rpc.register_handler(verb, std::move(func));
}
template <typename Func>
void register_handler_oneway(messaging_verb verb, Func&& func) {
auto rpc_handler = _rpc.register_handler(verb, std::move(func));
using Ret = typename function_traits<Func>::return_type;
using ArgsTuple = typename function_traits<Func>::args_as_tuple;
using handler_type = typename tuple_to_handler_type_oneway<Ret, ArgsTuple>::type;
_handlers.emplace(verb, std::make_unique<handler_type>(std::move(rpc_handler)));
_rpc.register_handler(verb, std::move(func));
}
// Send a message for verb
template <typename MsgIn, typename... MsgOut>
future<MsgIn> send_message(messaging_verb verb, shard_id id, MsgOut... msg) {
future<MsgIn> send_message(messaging_verb verb, shard_id id, MsgOut&&... msg) {
auto& rpc_client = get_rpc_client(id);
auto& rpc_handler = get_rpc_handler<MsgIn, MsgOut...>(verb);
return rpc_handler(rpc_client, std::move(msg)...).then_wrapped([this, id] (future<MsgIn> f) -> future<MsgIn> {
auto rpc_handler = _rpc.make_client<MsgIn(MsgOut...)>(verb);
return rpc_handler(rpc_client, std::forward<MsgOut>(msg)...).then_wrapped([this, id] (future<MsgIn> f) -> future<MsgIn> {
try {
auto ret = f.get();
return make_ready_future<MsgIn>(std::move(std::get<0>(ret)));
@@ -309,10 +284,10 @@ public:
}
template <typename MsgIn, typename... MsgOut>
future<> send_message_oneway(messaging_verb verb, shard_id id, MsgOut... msg) {
future<> send_message_oneway(messaging_verb verb, shard_id id, MsgOut&&... msg) {
auto& rpc_client = get_rpc_client(id);
auto& rpc_handler = get_rpc_handler_oneway<MsgIn, MsgOut...>(verb);
return rpc_handler(rpc_client, std::move(msg)...).then_wrapped([this, id] (future<> f) -> future<> {
auto rpc_handler = _rpc.make_client<rpc::no_wait_type(MsgOut...)>(verb);
return rpc_handler(rpc_client, std::forward<MsgOut>(msg)...).then_wrapped([this, id] (future<> f) -> future<> {
try {
f.get();
return make_ready_future<>();
@@ -340,22 +315,6 @@ private:
void remove_rpc_client(shard_id id) {
_clients.erase(id);
}
// Return a std::function<Ret (rpc::protocol::client, Ags...)> for verb
// which can be used by rpc client to start a rpc call
template <typename MsgIn, typename... MsgOut>
auto& get_rpc_handler(messaging_verb verb) {
handler_base* h = _handlers[verb].get();
using handler_type = handler<future<MsgIn>(rpc::protocol<serializer, messaging_verb>::client&, MsgOut...)>;
return static_cast<handler_type*>(h)->rpc_handler;
}
template <typename MsgIn, typename... MsgOut>
auto& get_rpc_handler_oneway(messaging_verb verb) {
handler_base* h = _handlers[verb].get();
using handler_type = handler<future<>(rpc::protocol<serializer, messaging_verb>::client&, MsgOut...)>;
return static_cast<handler_type*>(h)->rpc_handler;
}
};
extern distributed<messaging_service> _the_messaging_service;