messaging_service: Introduce send_message_timeout

It is used to send a message with timeout.
This commit is contained in:
Asias He
2015-09-06 14:42:45 +08:00
parent 9b1d08d734
commit 06fb6f6b30

View File

@@ -24,6 +24,7 @@ using gossip_digest_syn = gms::gossip_digest_syn;
using gossip_digest_ack = gms::gossip_digest_ack;
using gossip_digest_ack2 = gms::gossip_digest_ack2;
using rpc_protocol = rpc::protocol<serializer, messaging_verb>;
using namespace std::chrono_literals;
template <typename Output>
void net::serializer::write(Output& out, const gms::gossip_digest_syn& v) const {
@@ -309,6 +310,31 @@ auto send_message(messaging_service* ms, messaging_verb verb, shard_id id, MsgOu
});
}
// TODO: Remove duplicated code in send_message
template <typename MsgIn, typename... MsgOut>
auto send_message_timeout(messaging_service* ms, messaging_verb verb, shard_id id, std::chrono::milliseconds timeout, MsgOut&&... msg) {
auto rpc_client_ptr = ms->get_rpc_client(id);
auto rpc_handler = ms->rpc()->make_client<MsgIn(MsgOut...)>(verb);
auto& rpc_client = *rpc_client_ptr;
return rpc_handler(rpc_client, timeout, std::forward<MsgOut>(msg)...).then_wrapped([ms = ms->shared_from_this(), id, verb, rpc_client_ptr = std::move(rpc_client_ptr)] (auto&& f) {
try {
if (f.failed()) {
ms->increment_dropped_messages(verb);
f.get();
assert(false); // never reached
}
return std::move(f);
} catch (rpc::closed_error) {
// This is a transport error
ms->remove_rpc_client(id);
throw;
} catch (...) {
// This is expected to be a rpc server error, e.g., the rpc handler throws a std::runtime_error.
throw;
}
});
}
// Send one way message for verb
template <typename... MsgOut>
auto send_message_oneway(messaging_service* ms, messaging_verb verb, shard_id id, MsgOut&&... msg) {