diff --git a/core/reactor.cc b/core/reactor.cc index 3f788a7b50..48eaccc083 100644 --- a/core/reactor.cc +++ b/core/reactor.cc @@ -104,7 +104,7 @@ class bsd_connected_socket_impl final : public connected_socket_impl { private: explicit bsd_connected_socket_impl(pollable_fd fd) : _fd(std::move(fd)) {} public: - virtual input_stream input() override { return input_stream(_fd, 8192); } + virtual input_stream input() override { return input_stream(bsd_data_source(_fd)); } virtual output_stream output() override { return output_stream(_fd, 8192); } friend class bsd_server_socket_impl; }; diff --git a/core/reactor.hh b/core/reactor.hh index ad90b2639b..0cb8800c87 100644 --- a/core/reactor.hh +++ b/core/reactor.hh @@ -380,7 +380,7 @@ class temporary_buffer { public: explicit temporary_buffer(size_t size) : _buffer(new CharType[size]), _size(size), _deleter(new internal_deleter(nullptr, _buffer, size)) {} - explicit temporary_buffer(CharType* borrow, size_t size) : _buffer(borrow), _size(size) {} + //explicit temporary_buffer(CharType* borrow, size_t size) : _buffer(borrow), _size(size) {} temporary_buffer() = delete; temporary_buffer(const temporary_buffer&) = delete; temporary_buffer(temporary_buffer&& x) : _buffer(x._buffer), _size(x._size), _deleter(std::move(x._deleter)) { @@ -465,38 +465,22 @@ data_source bsd_data_source(pollable_fd& fd); template class input_stream { static_assert(sizeof(CharType) == 1, "must buffer stream of bytes"); - pollable_fd& _fd; - std::unique_ptr _buf; - size_t _size; - size_t _begin = 0; - size_t _end = 0; + data_source _fd; + temporary_buffer _buf; + std::vector> _scanned; bool _eof = false; private: using tmp_buf = temporary_buffer; - size_t available() const { return _end - _begin; } - size_t possibly_available() const { return _size - _begin; } - tmp_buf allocate(size_t n) { - if (n <= possibly_available()) { - return tmp_buf(_buf.get() + _begin, n); - } else { - return tmp_buf(n); - } - } - void advance(size_t n) { - _begin += n; - if (_begin == _end) { - _begin = _end = 0; - } - } + size_t available() const { return _buf.size(); } public: using char_type = CharType; - input_stream(pollable_fd& fd, size_t size) : _fd(fd), _buf(new char_type[size]), _size(size) {} + explicit input_stream(data_source fd, size_t buf_size = 8192) : _fd(std::move(fd)), _buf(0) {} future> read_exactly(size_t n); future> read_until(size_t limit, CharType eol); future> read_until(size_t limit, const CharType* eol, size_t eol_len); private: future> read_exactly_part(size_t n, tmp_buf buf, size_t completed); - future> read_until_part(size_t n, CharType eol, tmp_buf buf, size_t completed); + future> read_until_part(size_t n, CharType eol, size_t completed); }; template @@ -649,10 +633,8 @@ future> input_stream::read_exactly_part(size_t n, tmp_buf out, size_t completed) { if (available()) { auto now = std::min(n - completed, available()); - if (out.owned()) { - std::copy(_buf.get() + _begin, _buf.get() + _begin + now, out.get() + completed); - } - advance(now); + std::copy(_buf.get(), _buf.get() + now, out.get() + completed); + _buf.trim_front(now); completed += now; } if (completed == n) { @@ -660,74 +642,89 @@ input_stream::read_exactly_part(size_t n, tmp_buf out, size_t complete } // _buf is now empty - if (out.owned()) { - return _fd.read_some(out.get() + completed, n - completed).then( - [this, out = std::move(out), completed, n] (size_t now) mutable { - completed += now; - return read_exactly_part(n, std::move(out), completed); - }); - } else { - _fd.read_some(_buf.get(), _size).then( - [this, out = std::move(out), completed, n] (size_t now) mutable { - _end = now; - return read_exactly_part(n, std::move(out), completed); - }); - } + return _fd.get().then([this, n, out = std::move(out), completed] (char* data, size_t m, std::unique_ptr d) { + _buf = tmp_buf(data, m, std::move(d)); + return read_exactly_part(n, std::move(out), completed); + }); } template future> input_stream::read_exactly(size_t n) { - auto buf = allocate(n); - return read_exactly_part(n, buf, 0); + if (_buf.size() == n) { + // easy case: steal buffer, return to caller + return make_ready_future>(std::move(_buf)); + } else if (_buf.size() > n) { + // buffer large enough, share it with caller + auto front = _buf.share(0, n); + _buf.trim_front(n); + return make_ready_future>(front); + } else if (_buf.size() == 0) { + // buffer is empty: grab one and retry + return _fd.get().then([this, n] (char* data, size_t m, std::unique_ptr d) { + _buf = tmp_buf(data, m, std::move(d)); + return read_exactly(n); + }); + } else { + // buffer too small: start copy/read loop + temporary_buffer b(n); + return read_exactly_part(n, std::move(b), 0); + } } template future> -input_stream::read_until_part(size_t limit, CharType eol, tmp_buf out, - size_t completed) { - auto to_search = std::min(limit - completed, available()); - auto i = std::find(_buf.get() + _begin, _buf.get() + _begin + to_search, eol); - auto nr_found = i - (_buf.get() + _begin); - if (i != _buf.get() + _begin + to_search +input_stream::read_until_part(size_t limit, CharType eol, size_t completed) { + if (_buf.empty() && !_eof) { + return _fd.get().then([this, limit, eol, completed] (tmp_buf buf) { + _buf = std::move(buf); + if (_buf.empty()) { + _eof = true; + } + return read_until_part(limit, eol, completed); + }); + } + auto to_search = std::min(limit - completed, _buf.size()); + auto i = std::find(_buf.get(), _buf.get() + to_search, eol); + auto nr_found = i - _buf.get(); + if (i != _buf.get() + to_search || completed + nr_found == limit - || (i == _buf.get() + _end && _eof)) { - if (i != _buf.get() + _begin + to_search && completed + nr_found < limit) { + || (i == _buf.get() + _buf.size() && _eof)) { + if (i != _buf.get() + to_search && completed + nr_found < limit) { assert(*i == eol); ++i; // include eol in result ++nr_found; } - if (out.owning()) { - std::copy(_buf.get() + _begin, i, out.get_write() + completed); + if (_scanned.empty()) { + auto ret = _buf.share(0, nr_found); + _buf.trim_front(nr_found); + return make_ready_future(std::move(ret)); + } else { + tmp_buf out{completed + nr_found}; + auto p = out.get_write(); + for (auto&& b : _scanned) { + p = std::copy(b.get(), b.get() + b.size(), p); + } + std::copy(_buf.get(), _buf.get() + nr_found, p); + _buf.trim_front(nr_found); + _scanned.clear(); + return make_ready_future(std::move(out)); } - advance(nr_found); - completed += nr_found; - return make_ready_future(std::move(out).prefix(completed)); } else { - if (!out.owning() && _end == _size) { - // wrapping around, must allocate - auto new_out = tmp_buf(limit); - out = std::move(new_out); - } - if (out.owning()) { - std::copy(_buf.get() + _begin, _buf.get() + _end, out.get_write() + completed); - completed += _end - _begin; - _begin = _end = 0; - } - return _fd.read_some(_buf.get() + _end, _size - _end).then( - [this, limit, eol, out = std::move(out), completed] (size_t n) mutable { - _end += n; - _eof = n == 0; - return read_until_part(limit, eol, std::move(out), completed); - }); + _scanned.push_back(std::move(_buf)); + completed += nr_found; + return read_until_part(limit, eol, completed); } } template future> input_stream::read_until(size_t limit, CharType eol) { - return read_until_part(limit, eol, allocate(possibly_available()), 0); + if (limit == 0) { + return make_ready_future(tmp_buf(0)); + } + return read_until_part(limit, eol, 0); } #include