message: do not erase client's rpc call type
Currently we cast rpc call lambda to std::function (to put all lambdas in one container) which removes its template properties and gives it strictly typed interface. This is inconvenient since interface may either receive an object by value (which requires copy during invocation), or by reference (which require lifetime management), but not both. This patch changes the implementation to no store lambda in one central place, but create it with rpc::make_client() call when used. This way template properties are preserved and send_message() arguments are forwarded to rpc call with original types. Reviewed-by: Asias He <asias@cloudius-systems.com>
This commit is contained in:
@@ -232,7 +232,6 @@ private:
|
||||
rpc::protocol<serializer, messaging_verb> _rpc;
|
||||
rpc::protocol<serializer, messaging_verb>::server _server;
|
||||
std::unordered_map<shard_id, shard_info, shard_id::hash> _clients;
|
||||
std::unordered_map<messaging_verb, std::unique_ptr<handler_base>> _handlers;
|
||||
public:
|
||||
messaging_service(gms::inet_address ip = gms::inet_address("0.0.0.0"))
|
||||
: _listen_address(ip)
|
||||
@@ -254,48 +253,24 @@ public:
|
||||
static auto no_wait() {
|
||||
return rpc::no_wait;
|
||||
}
|
||||
private:
|
||||
template <typename Ret, typename Tuple>
|
||||
struct tuple_to_handler_type;
|
||||
|
||||
template <typename Ret, typename... Args>
|
||||
struct tuple_to_handler_type<Ret, std::tuple<Args...>> {
|
||||
using type = handler<Ret(rpc::protocol<serializer, messaging_verb>::client&, Args...)>;
|
||||
};
|
||||
|
||||
template <typename Ret, typename Tuple>
|
||||
struct tuple_to_handler_type_oneway;
|
||||
|
||||
template <typename Ret, typename... Args>
|
||||
struct tuple_to_handler_type_oneway<Ret, std::tuple<Args...>> {
|
||||
using type = handler<future<>(rpc::protocol<serializer, messaging_verb>::client&, Args...)>;
|
||||
};
|
||||
public:
|
||||
// Register a handler (a callback lambda) for verb
|
||||
template <typename Func>
|
||||
void register_handler(messaging_verb verb, Func&& func) {
|
||||
auto rpc_handler = _rpc.register_handler(verb, std::move(func));
|
||||
using Ret = typename function_traits<Func>::return_type;
|
||||
using ArgsTuple = typename function_traits<Func>::args_as_tuple;
|
||||
using handler_type = typename tuple_to_handler_type<Ret, ArgsTuple>::type;
|
||||
_handlers.emplace(verb, std::make_unique<handler_type>(std::move(rpc_handler)));
|
||||
_rpc.register_handler(verb, std::move(func));
|
||||
}
|
||||
|
||||
template <typename Func>
|
||||
void register_handler_oneway(messaging_verb verb, Func&& func) {
|
||||
auto rpc_handler = _rpc.register_handler(verb, std::move(func));
|
||||
using Ret = typename function_traits<Func>::return_type;
|
||||
using ArgsTuple = typename function_traits<Func>::args_as_tuple;
|
||||
using handler_type = typename tuple_to_handler_type_oneway<Ret, ArgsTuple>::type;
|
||||
_handlers.emplace(verb, std::make_unique<handler_type>(std::move(rpc_handler)));
|
||||
_rpc.register_handler(verb, std::move(func));
|
||||
}
|
||||
|
||||
// Send a message for verb
|
||||
template <typename MsgIn, typename... MsgOut>
|
||||
future<MsgIn> send_message(messaging_verb verb, shard_id id, MsgOut... msg) {
|
||||
future<MsgIn> send_message(messaging_verb verb, shard_id id, MsgOut&&... msg) {
|
||||
auto& rpc_client = get_rpc_client(id);
|
||||
auto& rpc_handler = get_rpc_handler<MsgIn, MsgOut...>(verb);
|
||||
return rpc_handler(rpc_client, std::move(msg)...).then_wrapped([this, id] (future<MsgIn> f) -> future<MsgIn> {
|
||||
auto rpc_handler = _rpc.make_client<MsgIn(MsgOut...)>(verb);
|
||||
return rpc_handler(rpc_client, std::forward<MsgOut>(msg)...).then_wrapped([this, id] (future<MsgIn> f) -> future<MsgIn> {
|
||||
try {
|
||||
auto ret = f.get();
|
||||
return make_ready_future<MsgIn>(std::move(std::get<0>(ret)));
|
||||
@@ -309,10 +284,10 @@ public:
|
||||
}
|
||||
|
||||
template <typename MsgIn, typename... MsgOut>
|
||||
future<> send_message_oneway(messaging_verb verb, shard_id id, MsgOut... msg) {
|
||||
future<> send_message_oneway(messaging_verb verb, shard_id id, MsgOut&&... msg) {
|
||||
auto& rpc_client = get_rpc_client(id);
|
||||
auto& rpc_handler = get_rpc_handler_oneway<MsgIn, MsgOut...>(verb);
|
||||
return rpc_handler(rpc_client, std::move(msg)...).then_wrapped([this, id] (future<> f) -> future<> {
|
||||
auto rpc_handler = _rpc.make_client<rpc::no_wait_type(MsgOut...)>(verb);
|
||||
return rpc_handler(rpc_client, std::forward<MsgOut>(msg)...).then_wrapped([this, id] (future<> f) -> future<> {
|
||||
try {
|
||||
f.get();
|
||||
return make_ready_future<>();
|
||||
@@ -340,22 +315,6 @@ private:
|
||||
void remove_rpc_client(shard_id id) {
|
||||
_clients.erase(id);
|
||||
}
|
||||
|
||||
// Return a std::function<Ret (rpc::protocol::client, Ags...)> for verb
|
||||
// which can be used by rpc client to start a rpc call
|
||||
template <typename MsgIn, typename... MsgOut>
|
||||
auto& get_rpc_handler(messaging_verb verb) {
|
||||
handler_base* h = _handlers[verb].get();
|
||||
using handler_type = handler<future<MsgIn>(rpc::protocol<serializer, messaging_verb>::client&, MsgOut...)>;
|
||||
return static_cast<handler_type*>(h)->rpc_handler;
|
||||
}
|
||||
|
||||
template <typename MsgIn, typename... MsgOut>
|
||||
auto& get_rpc_handler_oneway(messaging_verb verb) {
|
||||
handler_base* h = _handlers[verb].get();
|
||||
using handler_type = handler<future<>(rpc::protocol<serializer, messaging_verb>::client&, MsgOut...)>;
|
||||
return static_cast<handler_type*>(h)->rpc_handler;
|
||||
}
|
||||
};
|
||||
|
||||
extern distributed<messaging_service> _the_messaging_service;
|
||||
|
||||
Reference in New Issue
Block a user