From 8d9fb8a96cf77955ce9880f9c3fba9bf497d4963 Mon Sep 17 00:00:00 2001 From: Gleb Natapov Date: Sun, 10 May 2015 15:08:41 +0300 Subject: [PATCH] message: consolidate send_message() and send_message_oneway() send_message() and send_message_oneway() are almost identical, implement the later in terms of the former. The patch also fixes send_message() to work properly with MsgIn = void. Reviewed-by: Asias He --- gms/gossiper.cc | 4 ++-- message/messaging_service.hh | 30 +++++++++++------------------- tests/urchin/message.cc | 4 ++-- 3 files changed, 15 insertions(+), 23 deletions(-) diff --git a/gms/gossiper.cc b/gms/gossiper.cc index cfec4f6746..f6bfe25179 100644 --- a/gms/gossiper.cc +++ b/gms/gossiper.cc @@ -195,7 +195,7 @@ bool gossiper::send_gossip(gossip_digest_syn message, std::set eps } } gms::gossip_digest_ack2 ack2_msg(std::move(delta_ep_state_map)); - return ms().send_message_oneway(messaging_verb::GOSSIP_DIGEST_ACK2, std::move(id), std::move(ack2_msg)).then([] () { + return ms().send_message_oneway(messaging_verb::GOSSIP_DIGEST_ACK2, std::move(id), std::move(ack2_msg)).then([] () { return make_ready_future<>(); }); }); @@ -1181,7 +1181,7 @@ void gossiper::stop() { // logger.info("Announcing shutdown"); // Uninterruptibles.sleepUninterruptibly(INTERVAL_IN_MILLIS * 2, TimeUnit.MILLISECONDS); for (inet_address ep : _live_endpoints) { - ms().send_message_oneway(messaging_verb::GOSSIP_SHUTDOWN, get_shard_id(ep), ep).then([]{ + ms().send_message_oneway(messaging_verb::GOSSIP_SHUTDOWN, get_shard_id(ep), ep).then([]{ }); } } diff --git a/message/messaging_service.hh b/message/messaging_service.hh index a7c708cb22..43ad32236b 100644 --- a/message/messaging_service.hh +++ b/message/messaging_service.hh @@ -255,14 +255,17 @@ public: // Send a message for verb template - future send_message(messaging_verb verb, shard_id id, MsgOut&&... msg) { + auto send_message(messaging_verb verb, shard_id id, MsgOut&&... msg) { auto& rpc_client = get_rpc_client(id); auto rpc_handler = _rpc.make_client(verb); - return rpc_handler(rpc_client, std::forward(msg)...).then_wrapped([this, id] (future f) -> future { + return rpc_handler(rpc_client, std::forward(msg)...).then_wrapped([this, id] (auto&& f) { try { - auto ret = f.get(); - return make_ready_future(std::move(std::get<0>(ret))); - } catch (std::runtime_error&) { + if (f.failed()) { + f.get(); + assert(false); // never reached + } + return std::move(f); + } catch(...) { // FIXME: we need to distinguish between a transport error and // a server error. // remove_rpc_client(id); @@ -271,20 +274,9 @@ public: }); } - template - future<> send_message_oneway(messaging_verb verb, shard_id id, MsgOut&&... msg) { - auto& rpc_client = get_rpc_client(id); - auto rpc_handler = _rpc.make_client(verb); - return rpc_handler(rpc_client, std::forward(msg)...).then_wrapped([this, id] (future<> f) -> future<> { - try { - f.get(); - return make_ready_future<>(); - } catch (std::runtime_error&) { - // FIXME: as above - // remove_rpc_client(id); - throw; - } - }); + template + auto send_message_oneway(messaging_verb verb, shard_id id, MsgOut&&... msg) { + return send_message(std::move(verb), std::move(id), std::forward(msg)...); } private: // Return rpc::protocol::client for a shard which is a ip + cpuid pair. diff --git a/tests/urchin/message.cc b/tests/urchin/message.cc index b302664072..c237386c06 100644 --- a/tests/urchin/message.cc +++ b/tests/urchin/message.cc @@ -112,7 +112,7 @@ public: {ep1, endpoint_state()}, }; gms::gossip_digest_ack2 ack2(std::move(eps)); - return ms.send_message_oneway(messaging_verb::GOSSIP_DIGEST_ACK2, std::move(id), std::move(ack2)).then([] () { + return ms.send_message_oneway(messaging_verb::GOSSIP_DIGEST_ACK2, std::move(id), std::move(ack2)).then([] () { print("Client sent gossip_digest_ack2 got reply = void\n"); return make_ready_future<>(); }); @@ -123,7 +123,7 @@ public: print("=== %s ===\n", __func__); auto id = get_shard_id(); empty_msg msg; - return ms.send_message_oneway(messaging_verb::GOSSIP_SHUTDOWN, std::move(id), std::move(msg)).then([] () { + return ms.send_message_oneway(messaging_verb::GOSSIP_SHUTDOWN, std::move(id), std::move(msg)).then([] () { print("Client sent gossip_shutdown got reply = void\n"); return make_ready_future<>(); });