diff --git a/core/iostream-impl.hh b/core/iostream-impl.hh new file mode 100644 index 0000000000..6d62438d4f --- /dev/null +++ b/core/iostream-impl.hh @@ -0,0 +1,224 @@ +/* + * This file is open source software, licensed to you under the terms + * of the Apache License, Version 2.0 (the "License"). See the NOTICE file + * distributed with this work for additional information regarding copyright + * ownership. You may not use this file except in compliance with the License. + * + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* + * Copyright (C) 2015 Cloudius Systems, Ltd. + */ + + +#pragma once + +#include "net/packet.hh" + +template +inline +future<> output_stream::write(const char_type* buf) { + return write(buf, strlen(buf)); +} + +template +inline +future<> output_stream::write(const sstring& s) { + return write(s.c_str(), s.size()); +} + +template +future<> output_stream::write(scattered_message msg) { + return write(std::move(msg).release()); +} + +template +future<> output_stream::write(net::packet p) { + static_assert(std::is_same::value, "packet works on char"); + + if (p.len() == 0) { + return make_ready_future<>(); + } + + assert(!_end && "Mixing buffered writes and zero-copy writes not supported yet"); + + if (!_trim_to_size || p.len() <= _size) { + // TODO: aggregate buffers for later coalescing. Currently we flush right + // after appending the message anyway, so it doesn't matter. + return _fd.put(std::move(p)); + } + + auto head = p.share(0, _size); + p.trim_front(_size); + return _fd.put(std::move(head)).then([this, p = std::move(p)] () mutable { + return write(std::move(p)); + }); +} + +template +future> +input_stream::read_exactly_part(size_t n, tmp_buf out, size_t completed) { + if (available()) { + auto now = std::min(n - completed, available()); + std::copy(_buf.get(), _buf.get() + now, out.get_write() + completed); + _buf.trim_front(now); + completed += now; + } + if (completed == n) { + return make_ready_future(std::move(out)); + } + + // _buf is now empty + return _fd.get().then([this, n, out = std::move(out), completed] (auto buf) mutable { + if (buf.size() == 0) { + return make_ready_future(std::move(buf)); + } + _buf = std::move(buf); + return this->read_exactly_part(n, std::move(out), completed); + }); +} + +template +future> +input_stream::read_exactly(size_t n) { + if (_buf.size() == n) { + // easy case: steal buffer, return to caller + return make_ready_future(std::move(_buf)); + } else if (_buf.size() > n) { + // buffer large enough, share it with caller + auto front = _buf.share(0, n); + _buf.trim_front(n); + return make_ready_future(std::move(front)); + } else if (_buf.size() == 0) { + // buffer is empty: grab one and retry + return _fd.get().then([this, n] (auto buf) mutable { + if (buf.size() == 0) { + return make_ready_future(std::move(buf)); + } + _buf = std::move(buf); + return this->read_exactly(n); + }); + } else { + // buffer too small: start copy/read loop + tmp_buf b(n); + return read_exactly_part(n, std::move(b), 0); + } +} + +template +template +future<> +input_stream::consume(Consumer& consumer) { + if (_buf.empty() && !_eof) { + return _fd.get().then([this, &consumer] (tmp_buf buf) { + _buf = std::move(buf); + _eof = _buf.empty(); + return consume(consumer); + }); + } else { + auto tmp = std::move(_buf); + bool done = tmp.empty(); + consumer(std::move(tmp), [this, &done] (tmp_buf unconsumed) { + done = true; + if (!unconsumed.empty()) { + _buf = std::move(unconsumed); + } + }); + if (!done) { + return consume(consumer); + } else { + return make_ready_future<>(); + } + } +} + +// Writes @buf in chunks of _size length. The last chunk is buffered if smaller. +template +future<> +output_stream::split_and_put(temporary_buffer buf) { + assert(_end == 0); + + if (buf.size() < _size) { + if (!_buf) { + _buf = temporary_buffer(_size); + } + std::copy(buf.get(), buf.get() + buf.size(), _buf.get_write()); + _end = buf.size(); + return make_ready_future<>(); + } + + auto chunk = buf.share(0, _size); + buf.trim_front(_size); + return _fd.put(std::move(chunk)).then([this, buf = std::move(buf)] () mutable { + return split_and_put(std::move(buf)); + }); +} + +template +future<> +output_stream::write(const char_type* buf, size_t n) { + auto bulk_threshold = _end ? (2 * _size - _end) : _size; + if (n >= bulk_threshold) { + if (_end) { + auto now = _size - _end; + std::copy(buf, buf + now, _buf.get_write() + _end); + _end = _size; + temporary_buffer tmp(n - now); + std::copy(buf + now, buf + n, tmp.get_write()); + return flush().then([this, tmp = std::move(tmp)]() mutable { + if (_trim_to_size) { + return split_and_put(std::move(tmp)); + } else { + return _fd.put(std::move(tmp)); + } + }); + } else { + temporary_buffer tmp(n); + std::copy(buf, buf + n, tmp.get_write()); + if (_trim_to_size) { + return split_and_put(std::move(tmp)); + } else { + return _fd.put(std::move(tmp)); + } + } + } + + if (!_buf) { + _buf = temporary_buffer(_size); + } + + auto now = std::min(n, _size - _end); + std::copy(buf, buf + now, _buf.get_write() + _end); + _end += now; + if (now == n) { + return make_ready_future<>(); + } else { + temporary_buffer next(_size); + std::copy(buf + now, buf + n, next.get_write()); + _end = n - now; + std::swap(next, _buf); + return _fd.put(std::move(next)); + } +} + +template +future<> +output_stream::flush() { + if (!_end) { + return make_ready_future<>(); + } + _buf.trim(_end); + _end = 0; + return _fd.put(std::move(_buf)); +} + diff --git a/core/iostream.hh b/core/iostream.hh new file mode 100644 index 0000000000..b2eb9061d3 --- /dev/null +++ b/core/iostream.hh @@ -0,0 +1,161 @@ +/* + * This file is open source software, licensed to you under the terms + * of the Apache License, Version 2.0 (the "License"). See the NOTICE file + * distributed with this work for additional information regarding copyright + * ownership. You may not use this file except in compliance with the License. + * + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* + * Copyright (C) 2015 Cloudius Systems, Ltd. + */ + +// +// Buffered input and output streams +// +// Two abstract classes (data_source and data_sink) provide means +// to acquire bulk data from, or push bulk data to, some provider. +// These could be tied to a TCP connection, a disk file, or a memory +// buffer. +// +// Two concrete classes (input_stream and output_stream) buffer data +// from data_source and data_sink and provide easier means to process +// it. +// + +#pragma once + +#include "future.hh" +#include "temporary_buffer.hh" +#include "scattered_message.hh" + +namespace net { class packet; } + +class data_source_impl { +public: + virtual ~data_source_impl() {} + virtual future> get() = 0; +}; + +class data_source { + std::unique_ptr _dsi; +protected: + data_source_impl* impl() const { return _dsi.get(); } +public: + explicit data_source(std::unique_ptr dsi) : _dsi(std::move(dsi)) {} + data_source(data_source&& x) = default; + future> get() { return _dsi->get(); } +}; + +class data_sink_impl { +public: + virtual ~data_sink_impl() {} + virtual future<> put(net::packet data) = 0; + virtual future<> put(std::vector> data) { + net::packet p; + p.reserve(data.size()); + for (auto& buf : data) { + p = net::packet(std::move(p), net::fragment{buf.get_write(), buf.size()}, buf.release()); + } + return put(std::move(p)); + } + virtual future<> put(temporary_buffer buf) { + return put(net::packet(net::fragment{buf.get_write(), buf.size()}, buf.release())); + } + virtual future<> close() = 0; +}; + +class data_sink { + std::unique_ptr _dsi; +public: + explicit data_sink(std::unique_ptr dsi) : _dsi(std::move(dsi)) {} + data_sink(data_sink&& x) = default; + future<> put(std::vector> data) { + return _dsi->put(std::move(data)); + } + future<> put(temporary_buffer data) { + return _dsi->put(std::move(data)); + } + future<> put(net::packet p) { + return _dsi->put(std::move(p)); + } + future<> close() { return _dsi->close(); } +}; + +template +class input_stream { + static_assert(sizeof(CharType) == 1, "must buffer stream of bytes"); + data_source _fd; + temporary_buffer _buf; + bool _eof = false; +private: + using tmp_buf = temporary_buffer; + size_t available() const { return _buf.size(); } +protected: + void reset() { _buf = {}; } + data_source* fd() { return &_fd; } +public: + // Consumer concept, for consume() method: + struct ConsumerConcept { + // call done(tmp_buf) to signal end of processing. tmp_buf parameter to + // done is unconsumed data + template + void operator()(tmp_buf data, Done done); + }; + using char_type = CharType; + explicit input_stream(data_source fd, size_t buf_size = 8192) : _fd(std::move(fd)), _buf(0) {} + future> read_exactly(size_t n); + template + future<> consume(Consumer& c); + bool eof() { return _eof; } +private: + future> read_exactly_part(size_t n, tmp_buf buf, size_t completed); +}; + +// Facilitates data buffering before it's handed over to data_sink. +// +// When trim_to_size is true it's guaranteed that data sink will not receive +// chunks larger than the configured size, which could be the case when a +// single write call is made with data larger than the configured size. +// +// The data sink will not receive empty chunks. +// +template +class output_stream { + static_assert(sizeof(CharType) == 1, "must buffer stream of bytes"); + data_sink _fd; + temporary_buffer _buf; + size_t _size; + size_t _begin = 0; + size_t _end = 0; + bool _trim_to_size; +private: + size_t available() const { return _end - _begin; } + size_t possibly_available() const { return _size - _begin; } + future<> split_and_put(temporary_buffer buf); +public: + using char_type = CharType; + output_stream(data_sink fd, size_t size, bool trim_to_size = false) + : _fd(std::move(fd)), _size(size), _trim_to_size(trim_to_size) {} + future<> write(const char_type* buf, size_t n); + future<> write(const char_type* buf); + future<> write(const sstring& s); + future<> write(net::packet p); + future<> write(scattered_message msg); + future<> flush(); + future<> close() { return _fd.close(); } +private: +}; + + +#include "iostream-impl.hh" diff --git a/core/reactor.hh b/core/reactor.hh index 333184aa4c..40a7993d40 100644 --- a/core/reactor.hh +++ b/core/reactor.hh @@ -22,6 +22,7 @@ #ifndef REACTOR_HH_ #define REACTOR_HH_ +#include "iostream.hh" #include #include #include @@ -910,162 +911,6 @@ pollable_fd_state::~pollable_fd_state() { engine().forget(*this); } -class data_source_impl { -public: - virtual ~data_source_impl() {} - virtual future> get() = 0; -}; - -class data_source { - std::unique_ptr _dsi; -protected: - data_source_impl* impl() const { return _dsi.get(); } -public: - explicit data_source(std::unique_ptr dsi) : _dsi(std::move(dsi)) {} - data_source(data_source&& x) = default; - future> get() { return _dsi->get(); } -}; - -class data_sink_impl { -public: - virtual ~data_sink_impl() {} - virtual future<> put(net::packet data) = 0; - virtual future<> put(std::vector> data) { - net::packet p; - p.reserve(data.size()); - for (auto& buf : data) { - p = net::packet(std::move(p), net::fragment{buf.get_write(), buf.size()}, buf.release()); - } - return put(std::move(p)); - } - virtual future<> put(temporary_buffer buf) { - return put(net::packet(net::fragment{buf.get_write(), buf.size()}, buf.release())); - } - virtual future<> close() = 0; -}; - -class data_sink { - std::unique_ptr _dsi; -public: - explicit data_sink(std::unique_ptr dsi) : _dsi(std::move(dsi)) {} - data_sink(data_sink&& x) = default; - future<> put(std::vector> data) { - return _dsi->put(std::move(data)); - } - future<> put(temporary_buffer data) { - return _dsi->put(std::move(data)); - } - future<> put(net::packet p) { - return _dsi->put(std::move(p)); - } - future<> close() { return _dsi->close(); } -}; - -template -class input_stream { - static_assert(sizeof(CharType) == 1, "must buffer stream of bytes"); - data_source _fd; - temporary_buffer _buf; - bool _eof = false; -private: - using tmp_buf = temporary_buffer; - size_t available() const { return _buf.size(); } -protected: - void reset() { _buf = {}; } - data_source* fd() { return &_fd; } -public: - // Consumer concept, for consume() method: - struct ConsumerConcept { - // call done(tmp_buf) to signal end of processing. tmp_buf parameter to - // done is unconsumed data - template - void operator()(tmp_buf data, Done done); - }; - using char_type = CharType; - explicit input_stream(data_source fd, size_t buf_size = 8192) : _fd(std::move(fd)), _buf(0) {} - future> read_exactly(size_t n); - template - future<> consume(Consumer& c); - bool eof() { return _eof; } -private: - future> read_exactly_part(size_t n, tmp_buf buf, size_t completed); -}; - -// Facilitates data buffering before it's handed over to data_sink. -// -// When trim_to_size is true it's guaranteed that data sink will not receive -// chunks larger than the configured size, which could be the case when a -// single write call is made with data larger than the configured size. -// -// The data sink will not receive empty chunks. -// -template -class output_stream { - static_assert(sizeof(CharType) == 1, "must buffer stream of bytes"); - data_sink _fd; - temporary_buffer _buf; - size_t _size; - size_t _begin = 0; - size_t _end = 0; - bool _trim_to_size; -private: - size_t available() const { return _end - _begin; } - size_t possibly_available() const { return _size - _begin; } - future<> split_and_put(temporary_buffer buf); -public: - using char_type = CharType; - output_stream(data_sink fd, size_t size, bool trim_to_size = false) - : _fd(std::move(fd)), _size(size), _trim_to_size(trim_to_size) {} - future<> write(const char_type* buf, size_t n); - future<> write(const char_type* buf); - future<> write(const sstring& s); - future<> write(net::packet p); - future<> write(scattered_message msg); - future<> flush(); - future<> close() { return _fd.close(); } -private: -}; - -template -inline -future<> output_stream::write(const char_type* buf) { - return write(buf, strlen(buf)); -} - -template -inline -future<> output_stream::write(const sstring& s) { - return write(s.c_str(), s.size()); -} - -template -future<> output_stream::write(scattered_message msg) { - return write(std::move(msg).release()); -} - -template -future<> output_stream::write(net::packet p) { - static_assert(std::is_same::value, "packet works on char"); - - if (p.len() == 0) { - return make_ready_future<>(); - } - - assert(!_end && "Mixing buffered writes and zero-copy writes not supported yet"); - - if (!_trim_to_size || p.len() <= _size) { - // TODO: aggregate buffers for later coalescing. Currently we flush right - // after appending the message anyway, so it doesn't matter. - return _fd.put(std::move(p)); - } - - auto head = p.share(0, _size); - p.trim_front(_size); - return _fd.put(std::move(head)).then([this, p = std::move(p)] () mutable { - return write(std::move(p)); - }); -} - inline size_t iovec_len(const iovec* begin, size_t len) { @@ -1178,166 +1023,9 @@ void reactor::complete_timers(T& timers, E& expired_timers, EnableFunc&& enable_ enable_fn(); } -template -future> -input_stream::read_exactly_part(size_t n, tmp_buf out, size_t completed) { - if (available()) { - auto now = std::min(n - completed, available()); - std::copy(_buf.get(), _buf.get() + now, out.get_write() + completed); - _buf.trim_front(now); - completed += now; - } - if (completed == n) { - return make_ready_future(std::move(out)); - } - - // _buf is now empty - return _fd.get().then([this, n, out = std::move(out), completed] (auto buf) mutable { - if (buf.size() == 0) { - return make_ready_future(std::move(buf)); - } - _buf = std::move(buf); - return this->read_exactly_part(n, std::move(out), completed); - }); -} - -template -future> -input_stream::read_exactly(size_t n) { - if (_buf.size() == n) { - // easy case: steal buffer, return to caller - return make_ready_future(std::move(_buf)); - } else if (_buf.size() > n) { - // buffer large enough, share it with caller - auto front = _buf.share(0, n); - _buf.trim_front(n); - return make_ready_future(std::move(front)); - } else if (_buf.size() == 0) { - // buffer is empty: grab one and retry - return _fd.get().then([this, n] (auto buf) mutable { - if (buf.size() == 0) { - return make_ready_future(std::move(buf)); - } - _buf = std::move(buf); - return this->read_exactly(n); - }); - } else { - // buffer too small: start copy/read loop - tmp_buf b(n); - return read_exactly_part(n, std::move(b), 0); - } -} - -template -template -future<> -input_stream::consume(Consumer& consumer) { - if (_buf.empty() && !_eof) { - return _fd.get().then([this, &consumer] (tmp_buf buf) { - _buf = std::move(buf); - _eof = _buf.empty(); - return consume(consumer); - }); - } else { - auto tmp = std::move(_buf); - bool done = tmp.empty(); - consumer(std::move(tmp), [this, &done] (tmp_buf unconsumed) { - done = true; - if (!unconsumed.empty()) { - _buf = std::move(unconsumed); - } - }); - if (!done) { - return consume(consumer); - } else { - return make_ready_future<>(); - } - } -} - #include #include "sstring.hh" -// Writes @buf in chunks of _size length. The last chunk is buffered if smaller. -template -future<> -output_stream::split_and_put(temporary_buffer buf) { - assert(_end == 0); - - if (buf.size() < _size) { - if (!_buf) { - _buf = temporary_buffer(_size); - } - std::copy(buf.get(), buf.get() + buf.size(), _buf.get_write()); - _end = buf.size(); - return make_ready_future<>(); - } - - auto chunk = buf.share(0, _size); - buf.trim_front(_size); - return _fd.put(std::move(chunk)).then([this, buf = std::move(buf)] () mutable { - return split_and_put(std::move(buf)); - }); -} - -template -future<> -output_stream::write(const char_type* buf, size_t n) { - auto bulk_threshold = _end ? (2 * _size - _end) : _size; - if (n >= bulk_threshold) { - if (_end) { - auto now = _size - _end; - std::copy(buf, buf + now, _buf.get_write() + _end); - _end = _size; - temporary_buffer tmp(n - now); - std::copy(buf + now, buf + n, tmp.get_write()); - return flush().then([this, tmp = std::move(tmp)]() mutable { - if (_trim_to_size) { - return split_and_put(std::move(tmp)); - } else { - return _fd.put(std::move(tmp)); - } - }); - } else { - temporary_buffer tmp(n); - std::copy(buf, buf + n, tmp.get_write()); - if (_trim_to_size) { - return split_and_put(std::move(tmp)); - } else { - return _fd.put(std::move(tmp)); - } - } - } - - if (!_buf) { - _buf = temporary_buffer(_size); - } - - auto now = std::min(n, _size - _end); - std::copy(buf, buf + now, _buf.get_write() + _end); - _end += now; - if (now == n) { - return make_ready_future<>(); - } else { - temporary_buffer next(_size); - std::copy(buf + now, buf + n, next.get_write()); - _end = n - now; - std::swap(next, _buf); - return _fd.put(std::move(next)); - } -} - -template -future<> -output_stream::flush() { - if (!_end) { - return make_ready_future<>(); - } - _buf.trim(_end); - _end = 0; - return _fd.put(std::move(_buf)); -} - inline future pollable_fd::read_some(char* buffer, size_t size) { return engine().read_some(*_s, buffer, size); diff --git a/core/scattered_message.hh b/core/scattered_message.hh index b831c896a6..1b00e661d5 100644 --- a/core/scattered_message.hh +++ b/core/scattered_message.hh @@ -26,6 +26,7 @@ #include "core/deleter.hh" #include "core/temporary_buffer.hh" #include "net/packet.hh" +#include "sstring.hh" #include #include