From bcea3a67cadf89b74d26fd3b281fbc896b6ffaa5 Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Wed, 26 Nov 2014 14:06:20 +0100 Subject: [PATCH] output_stream: support for output packet trimming For UDP memcached we cannot generate arbitrarily large chunks, we need to trim to datagram size. It's most efficient to split in the output_stream. --- core/reactor.hh | 49 +++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 45 insertions(+), 4 deletions(-) diff --git a/core/reactor.hh b/core/reactor.hh index d34243a1cc..11d120a3b6 100644 --- a/core/reactor.hh +++ b/core/reactor.hh @@ -808,6 +808,14 @@ private: future> read_exactly_part(size_t n, tmp_buf buf, size_t completed); }; +// Facilitates data buffering before it's handed over to data_sink. +// +// When trim_to_size is true it's guaranteed that data sink will not receive +// chunks larger than the configured size, which could be the case when a +// single write call is made with data larger than the configured size. +// +// The data sink will not receive empty chunks. +// template class output_stream { static_assert(sizeof(CharType) == 1, "must buffer stream of bytes"); @@ -816,13 +824,15 @@ class output_stream { size_t _size; size_t _begin = 0; size_t _end = 0; + bool _trim_to_size; private: size_t available() const { return _end - _begin; } size_t possibly_available() const { return _size - _begin; } + future<> split_and_put(temporary_buffer buf); public: using char_type = CharType; - output_stream(data_sink fd, size_t size) - : _fd(std::move(fd)), _buf(size), _size(size) {} + output_stream(data_sink fd, size_t size, bool trim_to_size = false) + : _fd(std::move(fd)), _size(size), _trim_to_size(trim_to_size) {} future<> write(const char_type* buf, size_t n); future<> write(const char_type* buf); future<> write(const sstring& s); @@ -1024,6 +1034,29 @@ input_stream::consume(Consumer& consumer) { #include #include "sstring.hh" +// Writes @buf in chunks of _size length. The last chunk, if smaller +// is buffered. +template +future<> +output_stream::split_and_put(temporary_buffer buf) { + assert(_end == 0); + + if (buf.size() < _size) { + if (!_buf) { + _buf = temporary_buffer(_size); + } + std::copy(buf.get(), buf.get() + buf.size(), _buf.get_write()); + _end = buf.size(); + return make_ready_future<>(); + } + + auto chunk = buf.share(0, _size); + buf.trim_front(_size); + return _fd.put(std::move(chunk)).then([this, buf = std::move(buf)] () mutable { + return split_and_put(std::move(buf)); + }); +} + template future<> output_stream::write(const char_type* buf, size_t n) { @@ -1036,12 +1069,20 @@ output_stream::write(const char_type* buf, size_t n) { temporary_buffer tmp(n - now); std::copy(buf + now, buf + n, tmp.get_write()); return flush().then([this, tmp = std::move(tmp)]() mutable { - return _fd.put(std::move(tmp)); + if (_trim_to_size) { + return split_and_put(std::move(tmp)); + } else { + return _fd.put(std::move(tmp)); + } }); } else { temporary_buffer tmp(n); std::copy(buf, buf + n, tmp.get_write()); - return _fd.put(std::move(tmp)); + if (_trim_to_size) { + return split_and_put(std::move(tmp)); + } else { + return _fd.put(std::move(tmp)); + } } }