Merge branch 'master' of github.com:cloudius-systems/urchin into sstable-excl

This commit is contained in:
Avi Kivity
2015-05-13 18:06:19 +03:00
3 changed files with 15 additions and 23 deletions

View File

@@ -195,7 +195,7 @@ bool gossiper::send_gossip(gossip_digest_syn message, std::set<inet_address> eps
}
}
gms::gossip_digest_ack2 ack2_msg(std::move(delta_ep_state_map));
return ms().send_message_oneway<void>(messaging_verb::GOSSIP_DIGEST_ACK2, std::move(id), std::move(ack2_msg)).then([] () {
return ms().send_message_oneway(messaging_verb::GOSSIP_DIGEST_ACK2, std::move(id), std::move(ack2_msg)).then([] () {
return make_ready_future<>();
});
});
@@ -1181,7 +1181,7 @@ void gossiper::stop() {
// logger.info("Announcing shutdown");
// Uninterruptibles.sleepUninterruptibly(INTERVAL_IN_MILLIS * 2, TimeUnit.MILLISECONDS);
for (inet_address ep : _live_endpoints) {
ms().send_message_oneway<void>(messaging_verb::GOSSIP_SHUTDOWN, get_shard_id(ep), ep).then([]{
ms().send_message_oneway(messaging_verb::GOSSIP_SHUTDOWN, get_shard_id(ep), ep).then([]{
});
}
}

View File

@@ -255,14 +255,17 @@ public:
// Send a message for verb
template <typename MsgIn, typename... MsgOut>
future<MsgIn> send_message(messaging_verb verb, shard_id id, MsgOut&&... msg) {
auto send_message(messaging_verb verb, shard_id id, MsgOut&&... msg) {
auto& rpc_client = get_rpc_client(id);
auto rpc_handler = _rpc.make_client<MsgIn(MsgOut...)>(verb);
return rpc_handler(rpc_client, std::forward<MsgOut>(msg)...).then_wrapped([this, id] (future<MsgIn> f) -> future<MsgIn> {
return rpc_handler(rpc_client, std::forward<MsgOut>(msg)...).then_wrapped([this, id] (auto&& f) {
try {
auto ret = f.get();
return make_ready_future<MsgIn>(std::move(std::get<0>(ret)));
} catch (std::runtime_error&) {
if (f.failed()) {
f.get();
assert(false); // never reached
}
return std::move(f);
} catch(...) {
// FIXME: we need to distinguish between a transport error and
// a server error.
// remove_rpc_client(id);
@@ -271,20 +274,9 @@ public:
});
}
template <typename MsgIn, typename... MsgOut>
future<> send_message_oneway(messaging_verb verb, shard_id id, MsgOut&&... msg) {
auto& rpc_client = get_rpc_client(id);
auto rpc_handler = _rpc.make_client<rpc::no_wait_type(MsgOut...)>(verb);
return rpc_handler(rpc_client, std::forward<MsgOut>(msg)...).then_wrapped([this, id] (future<> f) -> future<> {
try {
f.get();
return make_ready_future<>();
} catch (std::runtime_error&) {
// FIXME: as above
// remove_rpc_client(id);
throw;
}
});
template <typename... MsgOut>
auto send_message_oneway(messaging_verb verb, shard_id id, MsgOut&&... msg) {
return send_message<rpc::no_wait_type>(std::move(verb), std::move(id), std::forward<MsgOut>(msg)...);
}
private:
// Return rpc::protocol::client for a shard which is a ip + cpuid pair.

View File

@@ -112,7 +112,7 @@ public:
{ep1, endpoint_state()},
};
gms::gossip_digest_ack2 ack2(std::move(eps));
return ms.send_message_oneway<void>(messaging_verb::GOSSIP_DIGEST_ACK2, std::move(id), std::move(ack2)).then([] () {
return ms.send_message_oneway(messaging_verb::GOSSIP_DIGEST_ACK2, std::move(id), std::move(ack2)).then([] () {
print("Client sent gossip_digest_ack2 got reply = void\n");
return make_ready_future<>();
});
@@ -123,7 +123,7 @@ public:
print("=== %s ===\n", __func__);
auto id = get_shard_id();
empty_msg msg;
return ms.send_message_oneway<void>(messaging_verb::GOSSIP_SHUTDOWN, std::move(id), std::move(msg)).then([] () {
return ms.send_message_oneway(messaging_verb::GOSSIP_SHUTDOWN, std::move(id), std::move(msg)).then([] () {
print("Client sent gossip_shutdown got reply = void\n");
return make_ready_future<>();
});