output_stream: support for output packet trimming

For UDP memcached we cannot generate arbitrarily large chunks, we need
to trim to datagram size. It's most efficient to split in the
output_stream.
This commit is contained in:
Tomasz Grabiec
2014-11-26 14:06:20 +01:00
parent 4b7c42a5c7
commit bcea3a67ca

View File

@@ -808,6 +808,14 @@ private:
future<temporary_buffer<CharType>> read_exactly_part(size_t n, tmp_buf buf, size_t completed);
};
// Facilitates data buffering before it's handed over to data_sink.
//
// When trim_to_size is true it's guaranteed that data sink will not receive
// chunks larger than the configured size, which could be the case when a
// single write call is made with data larger than the configured size.
//
// The data sink will not receive empty chunks.
//
template <typename CharType>
class output_stream {
static_assert(sizeof(CharType) == 1, "must buffer stream of bytes");
@@ -816,13 +824,15 @@ class output_stream {
size_t _size;
size_t _begin = 0;
size_t _end = 0;
bool _trim_to_size;
private:
size_t available() const { return _end - _begin; }
size_t possibly_available() const { return _size - _begin; }
future<> split_and_put(temporary_buffer<CharType> buf);
public:
using char_type = CharType;
output_stream(data_sink fd, size_t size)
: _fd(std::move(fd)), _buf(size), _size(size) {}
output_stream(data_sink fd, size_t size, bool trim_to_size = false)
: _fd(std::move(fd)), _size(size), _trim_to_size(trim_to_size) {}
future<> write(const char_type* buf, size_t n);
future<> write(const char_type* buf);
future<> write(const sstring& s);
@@ -1024,6 +1034,29 @@ input_stream<CharType>::consume(Consumer& consumer) {
#include <iostream>
#include "sstring.hh"
// Writes @buf in chunks of _size length. The last chunk, if smaller
// is buffered.
template <typename CharType>
future<>
output_stream<CharType>::split_and_put(temporary_buffer<CharType> buf) {
assert(_end == 0);
if (buf.size() < _size) {
if (!_buf) {
_buf = temporary_buffer<char>(_size);
}
std::copy(buf.get(), buf.get() + buf.size(), _buf.get_write());
_end = buf.size();
return make_ready_future<>();
}
auto chunk = buf.share(0, _size);
buf.trim_front(_size);
return _fd.put(std::move(chunk)).then([this, buf = std::move(buf)] () mutable {
return split_and_put(std::move(buf));
});
}
template <typename CharType>
future<>
output_stream<CharType>::write(const char_type* buf, size_t n) {
@@ -1036,12 +1069,20 @@ output_stream<CharType>::write(const char_type* buf, size_t n) {
temporary_buffer<char> tmp(n - now);
std::copy(buf + now, buf + n, tmp.get_write());
return flush().then([this, tmp = std::move(tmp)]() mutable {
return _fd.put(std::move(tmp));
if (_trim_to_size) {
return split_and_put(std::move(tmp));
} else {
return _fd.put(std::move(tmp));
}
});
} else {
temporary_buffer<char> tmp(n);
std::copy(buf, buf + n, tmp.get_write());
return _fd.put(std::move(tmp));
if (_trim_to_size) {
return split_and_put(std::move(tmp));
} else {
return _fd.put(std::move(tmp));
}
}
}