Merge branch 'gleb/merge' of github.com:cloudius-systems/seastar-dev into db

Merge Gleb's merge of the rpc changes into seastar.
This commit is contained in:
Avi Kivity
2015-05-05 10:41:34 +03:00
4 changed files with 141 additions and 31 deletions

View File

@@ -153,6 +153,11 @@ public:
basic_sstring(const char_type* b, const char_type* e) : basic_sstring(b, e - b) {}
basic_sstring(const std::basic_string<char_type>& s)
: basic_sstring(s.data(), s.size()) {}
template <typename InputIterator>
basic_sstring(InputIterator first, InputIterator last)
: basic_sstring(initialized_later(), std::distance(first, last)) {
std::copy(first, last, begin());
}
~basic_sstring() noexcept {
if (is_external()) {
std::free(u.external.str);

View File

@@ -131,7 +131,7 @@ struct serializer {
throw rpc::closed_error();
}
bytes_view bv(reinterpret_cast<const int8_t*>(buf.get()), serialize_string_size);
v = read_simple_short_string(bv);
new (&v) sstring(read_simple_short_string(bv));
return make_ready_future<>();
});
});
@@ -161,7 +161,7 @@ struct serializer {
throw rpc::closed_error();
}
bytes_view bv(reinterpret_cast<const int8_t*>(buf.get()), sz);
v = v.deserialize(bv);
new (&v) T(T::deserialize(bv));
return make_ready_future<>();
});
});

View File

@@ -43,23 +43,33 @@ inline std::enable_if_t<N != sizeof...(T), future<>> marshall(Serializer& serial
});
}
template<std::size_t N, typename Serializer, typename... T>
inline std::enable_if_t<N == sizeof...(T), future<>> unmarshall(Serializer&, input_stream<char>&, std::tuple<T&...>&&) {
// ArgsReady is a functor that will be called after each element is deserialized.
// It gets element's position in a tuple as a parameter. It is used by argument
// desererialization to mark already deserialized element as containing valid values
// that needs to be destroyed by a destructor.
template<std::size_t N, typename Serializer, typename ArgReady, typename... T>
inline std::enable_if_t<N == sizeof...(T), future<>> unmarshall(Serializer&, input_stream<char>&, std::tuple<T&...>&&, ArgReady&& argready) {
return make_ready_future<>();
}
template<std::size_t N = 0, typename Serializer, typename... T>
inline std::enable_if_t<N != sizeof...(T), future<>> unmarshall(Serializer& deserialize, input_stream<char>& in, std::tuple<T&...>&& args) {
template<std::size_t N = 0, typename Serializer, typename ArgReady, typename... T>
inline std::enable_if_t<N != sizeof...(T), future<>> unmarshall(Serializer& deserialize, input_stream<char>& in, std::tuple<T&...>&& args, ArgReady&& argready) {
// And you may ask yourself "What is that beautiful house?"^H^H^H^H "Why
// make_ready_future() here?". And there answer would be to convert
// exception thrown by deserialize info a future
return make_ready_future().then([&deserialize, &in, args = std::move(args)] {
return deserialize(in, std::get<N>(args)).then([&deserialize, &in, args = std::move(args)] () mutable {
return unmarshall<N + 1>(deserialize, in, std::move(args));
return make_ready_future().then([&deserialize, &in, args = std::move(args), argready = std::forward<ArgReady>(argready)] () mutable {
return deserialize(in, std::get<N>(args)).then([&deserialize, &in, args = std::move(args), argready = std::forward<ArgReady>(argready)] () mutable {
argready(N);
return unmarshall<N + 1>(deserialize, in, std::move(args), std::forward<ArgReady>(argready));
});
});
}
template<typename Serializer, typename... T>
inline future<> unmarshall(Serializer& deserializer, input_stream<char>& in, std::tuple<T&...>&& args) {
return unmarshall(deserializer, in, std::move(args), [](std::size_t n){});
}
// ref_tuple gets tuple and returns another tuple with references to members of received tuple
template<typename... T, std::size_t... I>
inline std::tuple<T&...> ref_tuple_impl(std::tuple<T...>& t, std::index_sequence<I...>) {
@@ -77,11 +87,31 @@ struct reply_payload_base {
template <typename T>
struct reply_payload : reply_payload_base {
T v;
void value_set() {
v_set = true;
}
void value_set(T&& v) {
new (&u.v) T(std::move(v));
value_set();
}
union U {
U() {}
~U() {}
typename std::aligned_storage<sizeof(T), alignof(T)>::type pad;
T v;
} u;
~reply_payload() {
if (v_set) {
u.v.~T();
}
}
private:
bool v_set = false; // set it to true when U::v is valid object
};
template<>
struct reply_payload<void> : reply_payload_base {
void value_set() {}
};
template<typename Payload, typename... T>
@@ -90,6 +120,7 @@ struct rcv_reply_base : reply_payload<Payload> {
promise<T...> p;
template<typename... V>
void set_value(V&&... v) {
this->value_set();
done = true;
p.set_value(std::forward<V>(v)...);
}
@@ -103,8 +134,8 @@ struct rcv_reply_base : reply_payload<Payload> {
template<typename Serializer, typename MsgType, typename T>
struct rcv_reply : rcv_reply_base<T, T> {
inline future<> get_reply(typename protocol<Serializer, MsgType>::client& dst) {
return unmarshall(dst.serializer(), dst.in(), std::tie(this->v)).then([this] {
this->set_value(this->v);
return unmarshall(dst.serializer(), dst.in(), std::tie(this->u.v)).then([this] {
this->set_value(this->u.v);
});
}
};
@@ -112,8 +143,8 @@ struct rcv_reply : rcv_reply_base<T, T> {
template<typename Serializer, typename MsgType, typename... T>
struct rcv_reply<Serializer, MsgType, future<T...>> : rcv_reply_base<std::tuple<T...>, T...> {
inline future<> get_reply(typename protocol<Serializer, MsgType>::client& dst) {
return unmarshall(dst.serializer(), dst.in(), ref_tuple(this->v)).then([this] {
this->set_value(this->v);
return unmarshall(dst.serializer(), dst.in(), ref_tuple(this->u.v)).then([this] {
this->set_value(this->u.v);
});
}
};
@@ -126,13 +157,44 @@ struct rcv_reply<Serializer, MsgType, void> : rcv_reply_base<void, void> {
}
};
// structure to hold outgoing message parameters on a client side
// while they are serialized
template<typename MsgType, typename... T>
struct message {
struct out_message {
MsgType t;
id_type id = 0;
std::tuple<T...> args;
message() = default;
message(MsgType xt, id_type xid, T&&... xargs) : t(xt), id(xid), args(std::forward<T>(xargs)...) {}
out_message() = delete;
out_message(MsgType xt, id_type xid, T&&... xargs) : t(xt), id(xid), args(std::forward<T>(xargs)...) {}
};
// structure to desrialize incoming message parameters to on a server side
template<typename MsgType, typename... T>
struct in_message {
using args_type = std::tuple<T...>;
id_type id = 0;
bool ready[sizeof...(T)] = {};
union U {
U() {}
~U() {}
typename std::aligned_storage<sizeof(args_type), alignof(args_type)>::type storage;
args_type args;
} u;
void set_ready(std::size_t n) {
assert(n < sizeof...(T));
ready[n] = true;
}
template<std::size_t... I>
inline void deleter(std::index_sequence<I...>) {
// this contraption calls tuple's element destructor if correspondent ready == true
int _[] = {0, (ready[I] && (std::get<I>(u.args).std::tuple_element<I, args_type>::type::~type(), true))...}; (void)_;
}
~in_message() {
deleter(std::make_index_sequence<sizeof...(T)>());
}
};
template<typename T1, typename T2>
@@ -185,6 +247,36 @@ inline auto wait_for_reply(typename protocol<Serializer, MsgType>::client& dst,
return std::move(sent);
}
template<typename Ex, typename... T> struct make_send_exception_helper {
auto operator()(Ex&& ex) {
return make_exception_future<T...>(std::move(ex));
}
};
template<typename Ex, typename... T> struct make_send_exception_helper<Ex, future<T...>> {
auto operator()(Ex&& ex) {
return make_exception_future<T...>(std::move(ex));
}
};
template<typename Ex> struct make_send_exception_helper<Ex, no_wait_type> {
auto operator()(Ex&& ex) {
return make_exception_future<>(std::move(ex));
}
};
template<typename Ex> struct make_send_exception_helper<Ex, void> {
auto operator()(Ex&& ex) {
return make_exception_future<>(std::move(ex));
}
};
template<typename Ret, typename Ex>
inline auto make_send_exception(Ex&& ex) {
make_send_exception_helper<Ex, Ret> ex_maker;
return ex_maker(std::move(ex));
}
// Returns lambda that can be used to send rpc messages.
// The lambda gets client connection and rpc parameters as arguments, marshalls them sends
// to a server and waits for a reply. After receiving reply it unmarshalls it and signal completion
@@ -198,12 +290,12 @@ auto send_helper(MsgType t, std::index_sequence<I...>) {
int _[] = { 0, (assert_type<decltype(args), typename std::tuple_element<I, types>::type>(), 0)... }; (void)_;
if (dst.error()) {
throw closed_error();
return make_send_exception<typename F::return_type>(closed_error());
}
// send message
auto msg_id = dst.next_message_id();
auto m = std::make_unique<message<MsgType, typename std::tuple_element<I, types>::type...>>(t, msg_id, std::forward<decltype(args)>(args)...);
auto m = std::make_unique<out_message<MsgType, typename std::tuple_element<I, types>::type...>>(t, msg_id, std::forward<decltype(args)>(args)...);
auto xargs = std::tie(m->t, m->id, std::get<I>(m->args)...); // holds references to all message elements
promise<> sent; // will be fulfilled when data is sent
auto fsent = sent.get_future();
@@ -232,10 +324,10 @@ template<typename Serializer, typename MsgType, typename T>
struct snd_reply : snd_reply_base<Serializer, MsgType, T> {
snd_reply(id_type xid) : snd_reply_base<Serializer, MsgType, T>(xid) {}
inline void set_val(std::tuple<T>&& val) {
this->v = std::move(std::get<0>(val));
this->value_set(std::move(std::get<0>(val)));
}
inline future<> reply(typename protocol<Serializer, MsgType>::server::connection& client) {
return marshall(client.serializer(), client.out(), std::tie(this->id, this->v));
return marshall(client.serializer(), client.out(), std::tie(this->id, this->u.v));
}
};
@@ -243,10 +335,10 @@ template<typename Serializer, typename MsgType, typename... T>
struct snd_reply<Serializer, MsgType, future<T...>> : snd_reply_base<Serializer, MsgType, std::tuple<T...>> {
snd_reply(id_type xid) : snd_reply_base<Serializer, MsgType, std::tuple<T...>>(xid) {}
inline void set_val(std::tuple<T...>&& val) {
this->v = std::move(val);
this->value_set(std::move(val));
}
inline future<> reply(typename protocol<Serializer, MsgType>::server::connection& client) {
return marshall(client.serializer(), client.out(), std::tuple_cat(std::tie(this->id), ref_tuple(this->v)));
return marshall(client.serializer(), client.out(), std::tuple_cat(std::tie(this->id), ref_tuple(this->u.v)));
}
};
@@ -287,17 +379,17 @@ inline future<> reply(std::unique_ptr<snd_reply<Serializer, MsgType, Ret>>& r, t
// build callback arguments tuple depending on whether it gets client_info as a first parameter
template<bool Info, typename MsgType, typename... M>
inline auto make_apply_args(client_info& info, std::unique_ptr<message<MsgType, M...>>& m, std::enable_if_t<!Info, void*> = nullptr) {
return std::move(m->args);
inline auto make_apply_args(client_info& info, std::unique_ptr<in_message<MsgType, M...>>& m, std::enable_if_t<!Info, void*> = nullptr) {
return std::move(m->u.args);
}
template<bool Info, typename MsgType, typename... M>
inline auto make_apply_args(client_info& info, std::unique_ptr<message<MsgType, M...>>& m, std::enable_if_t<Info, void*> = nullptr) {
return std::tuple_cat(std::make_tuple(std::cref(info)), std::move(m->args));
inline auto make_apply_args(client_info& info, std::unique_ptr<in_message<MsgType, M...>>& m, std::enable_if_t<Info, void*> = nullptr) {
return std::tuple_cat(std::make_tuple(std::cref(info)), std::move(m->u.args));
}
template<typename Ret, bool Info, typename Serializer, typename MsgType, typename Func, typename... M>
inline future<std::unique_ptr<snd_reply<Serializer, MsgType, Ret>>> apply(Func& func, client_info& info, std::unique_ptr<message<MsgType, M...>>&& m) {
inline future<std::unique_ptr<snd_reply<Serializer, MsgType, Ret>>> apply(Func& func, client_info& info, std::unique_ptr<in_message<MsgType, M...>>&& m) {
using futurator = futurize<Ret>;
auto r = std::make_unique<snd_reply<Serializer, MsgType, Ret>>(m->id);
try {
@@ -335,9 +427,14 @@ template<typename F, typename Serializer, typename MsgType, bool Info, typename
auto recv_helper(std::index_sequence<I...>, Func&& func) {
return [func = lref_to_cref(std::forward<Func>(func))](lw_shared_ptr<typename protocol<Serializer, MsgType>::server::connection> client) mutable {
// create message to hold all received values
auto m = std::make_unique<message<MsgType, typename F::template arg<I>::type...>>();
auto xargs = std::tie(m->id, std::get<I>(m->args)...); // holds reference to all message elements
return unmarshall(client->serializer(), client->in(), std::move(xargs)).then([client, m = std::move(m), &func] () mutable {
auto m = std::make_unique<in_message<MsgType, typename F::template arg<I>::type...>>();
auto argready = [mptr = m.get()] (std::size_t n) {
if (n) { // skip first element since it is not part of a message tuple
mptr->set_ready(n - 1);
}
};
auto xargs = std::tie(m->id, std::get<I>(m->u.args)...); // holds reference to all message elements
return unmarshall(client->serializer(), client->in(), std::move(xargs), std::move(argready)).then([client, m = std::move(m), &func] () mutable {
// note: apply is executed asynchronously with regards to networking so we cannot chain futures here by doing "return apply()"
apply<typename F::return_type, Info, Serializer>(func, client->info(), std::move(m)).then([client] (std::unique_ptr<snd_reply<Serializer, MsgType, typename F::return_type>>&& r) {
client->out_ready() = client->out_ready().then([client, r = std::move(r)] () mutable {
@@ -488,6 +585,7 @@ protocol<Serializer, MsgType>::client::client(protocol<Serializer, MsgType>& pro
}
});
}).finally([this] () {
this->_error = true;
this->_write_buf.close();
_outstanding.clear();
});

View File

@@ -24,6 +24,7 @@
#include <boost/test/included/unit_test.hpp>
#include "core/sstring.hh"
#include <list>
BOOST_AUTO_TEST_CASE(test_equality) {
BOOST_REQUIRE_EQUAL(sstring("aaa"), sstring("aaa"));
@@ -121,3 +122,9 @@ BOOST_AUTO_TEST_CASE(test_erase) {
BOOST_REQUIRE_EQUAL(str, "adef");
BOOST_REQUIRE_THROW(str.erase(str.begin() + 5, str.begin() + 6), std::out_of_range);
}
BOOST_AUTO_TEST_CASE(test_ctor_iterator) {
std::list<char> data{{'a', 'b', 'c'}};
sstring s(data.begin(), data.end());
BOOST_REQUIRE_EQUAL(s, "abc");
}