rpc: allocate header space together with payload

This commit is contained in:
Gleb Natapov
2015-07-18 21:08:20 +03:00
committed by Avi Kivity
parent 95970cfda0
commit 605e5c04ab

View File

@@ -149,18 +149,18 @@ public:
class simple_output_stream {
char* _p;
public:
simple_output_stream(sstring& s) : _p(s.begin()) {}
simple_output_stream(sstring& s, size_t start) : _p(s.begin() + start) {}
void write(const char* data, size_t size) {
_p = std::copy_n(data, size, _p);
}
};
template <typename Serializer, typename... T>
inline sstring marshall(Serializer& serializer, const T&... args) {
inline sstring marshall(Serializer& serializer, size_t head_space, const T&... args) {
measuring_output_stream measure;
do_marshall(serializer, measure, args...);
sstring ret(sstring::initialized_later(), measure.size());
simple_output_stream out(ret);
sstring ret(sstring::initialized_later(), measure.size() + head_space);
simple_output_stream out(ret, head_space);
do_marshall(serializer, out, args...);
return ret;
}
@@ -285,19 +285,15 @@ auto send_helper(MsgType t, signature<Ret (InArgs...)> sig) {
promise<> sent; // will be fulfilled when data is sent
auto fsent = sent.get_future();
dst.get_stats_internal().pending++;
sstring data = marshall(dst.serializer(), args...);
sstring header(sstring::initialized_later(), 24);
auto p = header.begin();
sstring data = marshall(dst.serializer(), 24, args...);
auto p = data.begin();
*unaligned_cast<uint64_t*>(p) = net::hton(uint64_t(t));
*unaligned_cast<int64_t*>(p + 8) = net::hton(msg_id);
*unaligned_cast<uint64_t*>(p + 16) = net::hton(data.size());
dst.out_ready() = do_with(std::move(header), std::move(data),
[&dst] (sstring& header, sstring& data) {
return dst.out_ready().then([&dst, &header, &data] () mutable {
return dst.out().write(header).then([&dst, &data] {
return dst.out().write(data).then([&dst] {
return dst.out().flush();
});
*unaligned_cast<uint64_t*>(p + 16) = net::hton(data.size() - 24);
dst.out_ready() = do_with(std::move(data), [&dst] (sstring& data) {
return dst.out_ready().then([&dst, &data] () mutable {
return dst.out().write(data).then([&dst] {
return dst.out().flush();
});
});
}).finally([&dst, sent = std::move(sent)] () mutable {
@@ -316,14 +312,12 @@ template <typename Serializer, typename MsgType>
inline
future<>
protocol<Serializer, MsgType>::server::connection::respond(int64_t msg_id, sstring&& data) {
std::array<char, 16> header;
*unaligned_cast<int64_t*>(header.data()) = net::hton(msg_id);
*unaligned_cast<uint64_t*>(header.data() + 8) = net::hton(data.size());
return do_with(header, std::move(data), [this, msg_id] (auto& header, const sstring& data) {
return this->out().write(header.data(), header.size()).then([this, &data] {
return this->out().write(data.begin(), data.size()).then([this] {
return this->out().flush();
});
auto p = data.begin();
*unaligned_cast<int64_t*>(p) = net::hton(msg_id);
*unaligned_cast<uint64_t*>(p + 8) = net::hton(data.size() - 16);
return do_with(std::move(data), [this, msg_id] (const sstring& data) {
return this->out().write(data.begin(), data.size()).then([this] {
return this->out().flush();
});
});
}
@@ -333,10 +327,10 @@ inline future<> reply(wait_type, future<RetTypes...>&& r, int64_t msgid, typenam
try {
auto&& data = r.get();
auto str = ::apply(marshall<Serializer, const RetTypes&...>,
std::tuple_cat(std::make_tuple(std::ref(client.serializer())), std::move(data)));
std::tuple_cat(std::make_tuple(std::ref(client.serializer()), 16), std::move(data)));
return client.respond(msgid, std::move(str));
} catch (std::exception& ex) {
return client.respond(-msgid, ex.what());
return client.respond(-msgid, sstring(sstring::initialized_later(), 16) + ex.what());
}
}