From 06fb6f6b30dfb8165020ab8a459f5eba4e4561d8 Mon Sep 17 00:00:00 2001 From: Asias He Date: Sun, 6 Sep 2015 14:42:45 +0800 Subject: [PATCH] messaging_service: Introduce send_message_timeout It is used to send a message with timeout. --- message/messaging_service.cc | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/message/messaging_service.cc b/message/messaging_service.cc index 0cd05c71f5..c1d7e29256 100644 --- a/message/messaging_service.cc +++ b/message/messaging_service.cc @@ -24,6 +24,7 @@ using gossip_digest_syn = gms::gossip_digest_syn; using gossip_digest_ack = gms::gossip_digest_ack; using gossip_digest_ack2 = gms::gossip_digest_ack2; using rpc_protocol = rpc::protocol; +using namespace std::chrono_literals; template void net::serializer::write(Output& out, const gms::gossip_digest_syn& v) const { @@ -309,6 +310,31 @@ auto send_message(messaging_service* ms, messaging_verb verb, shard_id id, MsgOu }); } +// TODO: Remove duplicated code in send_message +template +auto send_message_timeout(messaging_service* ms, messaging_verb verb, shard_id id, std::chrono::milliseconds timeout, MsgOut&&... msg) { + auto rpc_client_ptr = ms->get_rpc_client(id); + auto rpc_handler = ms->rpc()->make_client(verb); + auto& rpc_client = *rpc_client_ptr; + return rpc_handler(rpc_client, timeout, std::forward(msg)...).then_wrapped([ms = ms->shared_from_this(), id, verb, rpc_client_ptr = std::move(rpc_client_ptr)] (auto&& f) { + try { + if (f.failed()) { + ms->increment_dropped_messages(verb); + f.get(); + assert(false); // never reached + } + return std::move(f); + } catch (rpc::closed_error) { + // This is a transport error + ms->remove_rpc_client(id); + throw; + } catch (...) { + // This is expected to be a rpc server error, e.g., the rpc handler throws a std::runtime_error. + throw; + } + }); +} + // Send one way message for verb template auto send_message_oneway(messaging_service* ms, messaging_verb verb, shard_id id, MsgOut&&... msg) {