diff --git a/core/sstring.hh b/core/sstring.hh index c50006ab7d..d4e26144a2 100644 --- a/core/sstring.hh +++ b/core/sstring.hh @@ -153,6 +153,11 @@ public: basic_sstring(const char_type* b, const char_type* e) : basic_sstring(b, e - b) {} basic_sstring(const std::basic_string& s) : basic_sstring(s.data(), s.size()) {} + template + basic_sstring(InputIterator first, InputIterator last) + : basic_sstring(initialized_later(), std::distance(first, last)) { + std::copy(first, last, begin()); + } ~basic_sstring() noexcept { if (is_external()) { std::free(u.external.str); diff --git a/message/messaging_service.hh b/message/messaging_service.hh index 26f85005f9..26a27ac090 100644 --- a/message/messaging_service.hh +++ b/message/messaging_service.hh @@ -131,7 +131,7 @@ struct serializer { throw rpc::closed_error(); } bytes_view bv(reinterpret_cast(buf.get()), serialize_string_size); - v = read_simple_short_string(bv); + new (&v) sstring(read_simple_short_string(bv)); return make_ready_future<>(); }); }); @@ -161,7 +161,7 @@ struct serializer { throw rpc::closed_error(); } bytes_view bv(reinterpret_cast(buf.get()), sz); - v = v.deserialize(bv); + new (&v) T(T::deserialize(bv)); return make_ready_future<>(); }); }); diff --git a/rpc/rpc_impl.hh b/rpc/rpc_impl.hh index cfdca41c79..471006376c 100644 --- a/rpc/rpc_impl.hh +++ b/rpc/rpc_impl.hh @@ -43,23 +43,33 @@ inline std::enable_if_t> marshall(Serializer& serial }); } -template -inline std::enable_if_t> unmarshall(Serializer&, input_stream&, std::tuple&&) { +// ArgsReady is a functor that will be called after each element is deserialized. +// It gets element's position in a tuple as a parameter. It is used by argument +// desererialization to mark already deserialized element as containing valid values +// that needs to be destroyed by a destructor. +template +inline std::enable_if_t> unmarshall(Serializer&, input_stream&, std::tuple&&, ArgReady&& argready) { return make_ready_future<>(); } -template -inline std::enable_if_t> unmarshall(Serializer& deserialize, input_stream& in, std::tuple&& args) { +template +inline std::enable_if_t> unmarshall(Serializer& deserialize, input_stream& in, std::tuple&& args, ArgReady&& argready) { // And you may ask yourself "What is that beautiful house?"^H^H^H^H "Why // make_ready_future() here?". And there answer would be to convert // exception thrown by deserialize info a future - return make_ready_future().then([&deserialize, &in, args = std::move(args)] { - return deserialize(in, std::get(args)).then([&deserialize, &in, args = std::move(args)] () mutable { - return unmarshall(deserialize, in, std::move(args)); + return make_ready_future().then([&deserialize, &in, args = std::move(args), argready = std::forward(argready)] () mutable { + return deserialize(in, std::get(args)).then([&deserialize, &in, args = std::move(args), argready = std::forward(argready)] () mutable { + argready(N); + return unmarshall(deserialize, in, std::move(args), std::forward(argready)); }); }); } +template +inline future<> unmarshall(Serializer& deserializer, input_stream& in, std::tuple&& args) { + return unmarshall(deserializer, in, std::move(args), [](std::size_t n){}); +} + // ref_tuple gets tuple and returns another tuple with references to members of received tuple template inline std::tuple ref_tuple_impl(std::tuple& t, std::index_sequence) { @@ -77,11 +87,31 @@ struct reply_payload_base { template struct reply_payload : reply_payload_base { - T v; + void value_set() { + v_set = true; + } + void value_set(T&& v) { + new (&u.v) T(std::move(v)); + value_set(); + } + union U { + U() {} + ~U() {} + typename std::aligned_storage::type pad; + T v; + } u; + ~reply_payload() { + if (v_set) { + u.v.~T(); + } + } +private: + bool v_set = false; // set it to true when U::v is valid object }; template<> struct reply_payload : reply_payload_base { + void value_set() {} }; template @@ -90,6 +120,7 @@ struct rcv_reply_base : reply_payload { promise p; template void set_value(V&&... v) { + this->value_set(); done = true; p.set_value(std::forward(v)...); } @@ -103,8 +134,8 @@ struct rcv_reply_base : reply_payload { template struct rcv_reply : rcv_reply_base { inline future<> get_reply(typename protocol::client& dst) { - return unmarshall(dst.serializer(), dst.in(), std::tie(this->v)).then([this] { - this->set_value(this->v); + return unmarshall(dst.serializer(), dst.in(), std::tie(this->u.v)).then([this] { + this->set_value(this->u.v); }); } }; @@ -112,8 +143,8 @@ struct rcv_reply : rcv_reply_base { template struct rcv_reply> : rcv_reply_base, T...> { inline future<> get_reply(typename protocol::client& dst) { - return unmarshall(dst.serializer(), dst.in(), ref_tuple(this->v)).then([this] { - this->set_value(this->v); + return unmarshall(dst.serializer(), dst.in(), ref_tuple(this->u.v)).then([this] { + this->set_value(this->u.v); }); } }; @@ -126,13 +157,44 @@ struct rcv_reply : rcv_reply_base { } }; +// structure to hold outgoing message parameters on a client side +// while they are serialized template -struct message { +struct out_message { MsgType t; id_type id = 0; std::tuple args; - message() = default; - message(MsgType xt, id_type xid, T&&... xargs) : t(xt), id(xid), args(std::forward(xargs)...) {} + out_message() = delete; + out_message(MsgType xt, id_type xid, T&&... xargs) : t(xt), id(xid), args(std::forward(xargs)...) {} +}; + +// structure to desrialize incoming message parameters to on a server side +template +struct in_message { + using args_type = std::tuple; + id_type id = 0; + bool ready[sizeof...(T)] = {}; + union U { + U() {} + ~U() {} + typename std::aligned_storage::type storage; + args_type args; + } u; + + void set_ready(std::size_t n) { + assert(n < sizeof...(T)); + ready[n] = true; + } + + template + inline void deleter(std::index_sequence) { + // this contraption calls tuple's element destructor if correspondent ready == true + int _[] = {0, (ready[I] && (std::get(u.args).std::tuple_element::type::~type(), true))...}; (void)_; + } + + ~in_message() { + deleter(std::make_index_sequence()); + } }; template @@ -185,6 +247,36 @@ inline auto wait_for_reply(typename protocol::client& dst, return std::move(sent); } +template struct make_send_exception_helper { + auto operator()(Ex&& ex) { + return make_exception_future(std::move(ex)); + } +}; + +template struct make_send_exception_helper> { + auto operator()(Ex&& ex) { + return make_exception_future(std::move(ex)); + } +}; + +template struct make_send_exception_helper { + auto operator()(Ex&& ex) { + return make_exception_future<>(std::move(ex)); + } +}; + +template struct make_send_exception_helper { + auto operator()(Ex&& ex) { + return make_exception_future<>(std::move(ex)); + } +}; + +template +inline auto make_send_exception(Ex&& ex) { + make_send_exception_helper ex_maker; + return ex_maker(std::move(ex)); +} + // Returns lambda that can be used to send rpc messages. // The lambda gets client connection and rpc parameters as arguments, marshalls them sends // to a server and waits for a reply. After receiving reply it unmarshalls it and signal completion @@ -198,12 +290,12 @@ auto send_helper(MsgType t, std::index_sequence) { int _[] = { 0, (assert_type::type>(), 0)... }; (void)_; if (dst.error()) { - throw closed_error(); + return make_send_exception(closed_error()); } // send message auto msg_id = dst.next_message_id(); - auto m = std::make_unique::type...>>(t, msg_id, std::forward(args)...); + auto m = std::make_unique::type...>>(t, msg_id, std::forward(args)...); auto xargs = std::tie(m->t, m->id, std::get(m->args)...); // holds references to all message elements promise<> sent; // will be fulfilled when data is sent auto fsent = sent.get_future(); @@ -232,10 +324,10 @@ template struct snd_reply : snd_reply_base { snd_reply(id_type xid) : snd_reply_base(xid) {} inline void set_val(std::tuple&& val) { - this->v = std::move(std::get<0>(val)); + this->value_set(std::move(std::get<0>(val))); } inline future<> reply(typename protocol::server::connection& client) { - return marshall(client.serializer(), client.out(), std::tie(this->id, this->v)); + return marshall(client.serializer(), client.out(), std::tie(this->id, this->u.v)); } }; @@ -243,10 +335,10 @@ template struct snd_reply> : snd_reply_base> { snd_reply(id_type xid) : snd_reply_base>(xid) {} inline void set_val(std::tuple&& val) { - this->v = std::move(val); + this->value_set(std::move(val)); } inline future<> reply(typename protocol::server::connection& client) { - return marshall(client.serializer(), client.out(), std::tuple_cat(std::tie(this->id), ref_tuple(this->v))); + return marshall(client.serializer(), client.out(), std::tuple_cat(std::tie(this->id), ref_tuple(this->u.v))); } }; @@ -287,17 +379,17 @@ inline future<> reply(std::unique_ptr>& r, t // build callback arguments tuple depending on whether it gets client_info as a first parameter template -inline auto make_apply_args(client_info& info, std::unique_ptr>& m, std::enable_if_t = nullptr) { - return std::move(m->args); +inline auto make_apply_args(client_info& info, std::unique_ptr>& m, std::enable_if_t = nullptr) { + return std::move(m->u.args); } template -inline auto make_apply_args(client_info& info, std::unique_ptr>& m, std::enable_if_t = nullptr) { - return std::tuple_cat(std::make_tuple(std::cref(info)), std::move(m->args)); +inline auto make_apply_args(client_info& info, std::unique_ptr>& m, std::enable_if_t = nullptr) { + return std::tuple_cat(std::make_tuple(std::cref(info)), std::move(m->u.args)); } template -inline future>> apply(Func& func, client_info& info, std::unique_ptr>&& m) { +inline future>> apply(Func& func, client_info& info, std::unique_ptr>&& m) { using futurator = futurize; auto r = std::make_unique>(m->id); try { @@ -335,9 +427,14 @@ template, Func&& func) { return [func = lref_to_cref(std::forward(func))](lw_shared_ptr::server::connection> client) mutable { // create message to hold all received values - auto m = std::make_unique::type...>>(); - auto xargs = std::tie(m->id, std::get(m->args)...); // holds reference to all message elements - return unmarshall(client->serializer(), client->in(), std::move(xargs)).then([client, m = std::move(m), &func] () mutable { + auto m = std::make_unique::type...>>(); + auto argready = [mptr = m.get()] (std::size_t n) { + if (n) { // skip first element since it is not part of a message tuple + mptr->set_ready(n - 1); + } + }; + auto xargs = std::tie(m->id, std::get(m->u.args)...); // holds reference to all message elements + return unmarshall(client->serializer(), client->in(), std::move(xargs), std::move(argready)).then([client, m = std::move(m), &func] () mutable { // note: apply is executed asynchronously with regards to networking so we cannot chain futures here by doing "return apply()" apply(func, client->info(), std::move(m)).then([client] (std::unique_ptr>&& r) { client->out_ready() = client->out_ready().then([client, r = std::move(r)] () mutable { @@ -488,6 +585,7 @@ protocol::client::client(protocol& pro } }); }).finally([this] () { + this->_error = true; this->_write_buf.close(); _outstanding.clear(); }); diff --git a/tests/sstring_test.cc b/tests/sstring_test.cc index 7a1fbc1135..21df8f4821 100644 --- a/tests/sstring_test.cc +++ b/tests/sstring_test.cc @@ -24,6 +24,7 @@ #include #include "core/sstring.hh" +#include BOOST_AUTO_TEST_CASE(test_equality) { BOOST_REQUIRE_EQUAL(sstring("aaa"), sstring("aaa")); @@ -121,3 +122,9 @@ BOOST_AUTO_TEST_CASE(test_erase) { BOOST_REQUIRE_EQUAL(str, "adef"); BOOST_REQUIRE_THROW(str.erase(str.begin() + 5, str.begin() + 6), std::out_of_range); } + +BOOST_AUTO_TEST_CASE(test_ctor_iterator) { + std::list data{{'a', 'b', 'c'}}; + sstring s(data.begin(), data.end()); + BOOST_REQUIRE_EQUAL(s, "abc"); +}