mirror of
https://github.com/scylladb/scylladb.git
synced 2026-04-22 09:30:45 +00:00
core: switch input_stream to use data_source
With this change, input_stream can be used to either the hosted or the native stack (when it is plugged in).
This commit is contained in:
@@ -104,7 +104,7 @@ class bsd_connected_socket_impl final : public connected_socket_impl {
|
||||
private:
|
||||
explicit bsd_connected_socket_impl(pollable_fd fd) : _fd(std::move(fd)) {}
|
||||
public:
|
||||
virtual input_stream<char> input() override { return input_stream<char>(_fd, 8192); }
|
||||
virtual input_stream<char> input() override { return input_stream<char>(bsd_data_source(_fd)); }
|
||||
virtual output_stream<char> output() override { return output_stream<char>(_fd, 8192); }
|
||||
friend class bsd_server_socket_impl;
|
||||
};
|
||||
|
||||
141
core/reactor.hh
141
core/reactor.hh
@@ -380,7 +380,7 @@ class temporary_buffer {
|
||||
public:
|
||||
explicit temporary_buffer(size_t size)
|
||||
: _buffer(new CharType[size]), _size(size), _deleter(new internal_deleter(nullptr, _buffer, size)) {}
|
||||
explicit temporary_buffer(CharType* borrow, size_t size) : _buffer(borrow), _size(size) {}
|
||||
//explicit temporary_buffer(CharType* borrow, size_t size) : _buffer(borrow), _size(size) {}
|
||||
temporary_buffer() = delete;
|
||||
temporary_buffer(const temporary_buffer&) = delete;
|
||||
temporary_buffer(temporary_buffer&& x) : _buffer(x._buffer), _size(x._size), _deleter(std::move(x._deleter)) {
|
||||
@@ -465,38 +465,22 @@ data_source bsd_data_source(pollable_fd& fd);
|
||||
template <typename CharType>
|
||||
class input_stream {
|
||||
static_assert(sizeof(CharType) == 1, "must buffer stream of bytes");
|
||||
pollable_fd& _fd;
|
||||
std::unique_ptr<CharType[]> _buf;
|
||||
size_t _size;
|
||||
size_t _begin = 0;
|
||||
size_t _end = 0;
|
||||
data_source _fd;
|
||||
temporary_buffer<CharType> _buf;
|
||||
std::vector<temporary_buffer<CharType>> _scanned;
|
||||
bool _eof = false;
|
||||
private:
|
||||
using tmp_buf = temporary_buffer<CharType>;
|
||||
size_t available() const { return _end - _begin; }
|
||||
size_t possibly_available() const { return _size - _begin; }
|
||||
tmp_buf allocate(size_t n) {
|
||||
if (n <= possibly_available()) {
|
||||
return tmp_buf(_buf.get() + _begin, n);
|
||||
} else {
|
||||
return tmp_buf(n);
|
||||
}
|
||||
}
|
||||
void advance(size_t n) {
|
||||
_begin += n;
|
||||
if (_begin == _end) {
|
||||
_begin = _end = 0;
|
||||
}
|
||||
}
|
||||
size_t available() const { return _buf.size(); }
|
||||
public:
|
||||
using char_type = CharType;
|
||||
input_stream(pollable_fd& fd, size_t size) : _fd(fd), _buf(new char_type[size]), _size(size) {}
|
||||
explicit input_stream(data_source fd, size_t buf_size = 8192) : _fd(std::move(fd)), _buf(0) {}
|
||||
future<temporary_buffer<CharType>> read_exactly(size_t n);
|
||||
future<temporary_buffer<CharType>> read_until(size_t limit, CharType eol);
|
||||
future<temporary_buffer<CharType>> read_until(size_t limit, const CharType* eol, size_t eol_len);
|
||||
private:
|
||||
future<temporary_buffer<CharType>> read_exactly_part(size_t n, tmp_buf buf, size_t completed);
|
||||
future<temporary_buffer<CharType>> read_until_part(size_t n, CharType eol, tmp_buf buf, size_t completed);
|
||||
future<temporary_buffer<CharType>> read_until_part(size_t n, CharType eol, size_t completed);
|
||||
};
|
||||
|
||||
template <typename CharType>
|
||||
@@ -649,10 +633,8 @@ future<temporary_buffer<CharType>>
|
||||
input_stream<CharType>::read_exactly_part(size_t n, tmp_buf out, size_t completed) {
|
||||
if (available()) {
|
||||
auto now = std::min(n - completed, available());
|
||||
if (out.owned()) {
|
||||
std::copy(_buf.get() + _begin, _buf.get() + _begin + now, out.get() + completed);
|
||||
}
|
||||
advance(now);
|
||||
std::copy(_buf.get(), _buf.get() + now, out.get() + completed);
|
||||
_buf.trim_front(now);
|
||||
completed += now;
|
||||
}
|
||||
if (completed == n) {
|
||||
@@ -660,74 +642,89 @@ input_stream<CharType>::read_exactly_part(size_t n, tmp_buf out, size_t complete
|
||||
}
|
||||
|
||||
// _buf is now empty
|
||||
if (out.owned()) {
|
||||
return _fd.read_some(out.get() + completed, n - completed).then(
|
||||
[this, out = std::move(out), completed, n] (size_t now) mutable {
|
||||
completed += now;
|
||||
return read_exactly_part(n, std::move(out), completed);
|
||||
});
|
||||
} else {
|
||||
_fd.read_some(_buf.get(), _size).then(
|
||||
[this, out = std::move(out), completed, n] (size_t now) mutable {
|
||||
_end = now;
|
||||
return read_exactly_part(n, std::move(out), completed);
|
||||
});
|
||||
}
|
||||
return _fd.get().then([this, n, out = std::move(out), completed] (char* data, size_t m, std::unique_ptr<deleter> d) {
|
||||
_buf = tmp_buf(data, m, std::move(d));
|
||||
return read_exactly_part(n, std::move(out), completed);
|
||||
});
|
||||
}
|
||||
|
||||
template <typename CharType>
|
||||
future<temporary_buffer<CharType>>
|
||||
input_stream<CharType>::read_exactly(size_t n) {
|
||||
auto buf = allocate(n);
|
||||
return read_exactly_part(n, buf, 0);
|
||||
if (_buf.size() == n) {
|
||||
// easy case: steal buffer, return to caller
|
||||
return make_ready_future<temporary_buffer<CharType>>(std::move(_buf));
|
||||
} else if (_buf.size() > n) {
|
||||
// buffer large enough, share it with caller
|
||||
auto front = _buf.share(0, n);
|
||||
_buf.trim_front(n);
|
||||
return make_ready_future<temporary_buffer<CharType>>(front);
|
||||
} else if (_buf.size() == 0) {
|
||||
// buffer is empty: grab one and retry
|
||||
return _fd.get().then([this, n] (char* data, size_t m, std::unique_ptr<deleter> d) {
|
||||
_buf = tmp_buf(data, m, std::move(d));
|
||||
return read_exactly(n);
|
||||
});
|
||||
} else {
|
||||
// buffer too small: start copy/read loop
|
||||
temporary_buffer<CharType> b(n);
|
||||
return read_exactly_part(n, std::move(b), 0);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
template <typename CharType>
|
||||
future<temporary_buffer<CharType>>
|
||||
input_stream<CharType>::read_until_part(size_t limit, CharType eol, tmp_buf out,
|
||||
size_t completed) {
|
||||
auto to_search = std::min(limit - completed, available());
|
||||
auto i = std::find(_buf.get() + _begin, _buf.get() + _begin + to_search, eol);
|
||||
auto nr_found = i - (_buf.get() + _begin);
|
||||
if (i != _buf.get() + _begin + to_search
|
||||
input_stream<CharType>::read_until_part(size_t limit, CharType eol, size_t completed) {
|
||||
if (_buf.empty() && !_eof) {
|
||||
return _fd.get().then([this, limit, eol, completed] (tmp_buf buf) {
|
||||
_buf = std::move(buf);
|
||||
if (_buf.empty()) {
|
||||
_eof = true;
|
||||
}
|
||||
return read_until_part(limit, eol, completed);
|
||||
});
|
||||
}
|
||||
auto to_search = std::min(limit - completed, _buf.size());
|
||||
auto i = std::find(_buf.get(), _buf.get() + to_search, eol);
|
||||
auto nr_found = i - _buf.get();
|
||||
if (i != _buf.get() + to_search
|
||||
|| completed + nr_found == limit
|
||||
|| (i == _buf.get() + _end && _eof)) {
|
||||
if (i != _buf.get() + _begin + to_search && completed + nr_found < limit) {
|
||||
|| (i == _buf.get() + _buf.size() && _eof)) {
|
||||
if (i != _buf.get() + to_search && completed + nr_found < limit) {
|
||||
assert(*i == eol);
|
||||
++i; // include eol in result
|
||||
++nr_found;
|
||||
}
|
||||
if (out.owning()) {
|
||||
std::copy(_buf.get() + _begin, i, out.get_write() + completed);
|
||||
if (_scanned.empty()) {
|
||||
auto ret = _buf.share(0, nr_found);
|
||||
_buf.trim_front(nr_found);
|
||||
return make_ready_future<tmp_buf>(std::move(ret));
|
||||
} else {
|
||||
tmp_buf out{completed + nr_found};
|
||||
auto p = out.get_write();
|
||||
for (auto&& b : _scanned) {
|
||||
p = std::copy(b.get(), b.get() + b.size(), p);
|
||||
}
|
||||
std::copy(_buf.get(), _buf.get() + nr_found, p);
|
||||
_buf.trim_front(nr_found);
|
||||
_scanned.clear();
|
||||
return make_ready_future<tmp_buf>(std::move(out));
|
||||
}
|
||||
advance(nr_found);
|
||||
completed += nr_found;
|
||||
return make_ready_future<tmp_buf>(std::move(out).prefix(completed));
|
||||
} else {
|
||||
if (!out.owning() && _end == _size) {
|
||||
// wrapping around, must allocate
|
||||
auto new_out = tmp_buf(limit);
|
||||
out = std::move(new_out);
|
||||
}
|
||||
if (out.owning()) {
|
||||
std::copy(_buf.get() + _begin, _buf.get() + _end, out.get_write() + completed);
|
||||
completed += _end - _begin;
|
||||
_begin = _end = 0;
|
||||
}
|
||||
return _fd.read_some(_buf.get() + _end, _size - _end).then(
|
||||
[this, limit, eol, out = std::move(out), completed] (size_t n) mutable {
|
||||
_end += n;
|
||||
_eof = n == 0;
|
||||
return read_until_part(limit, eol, std::move(out), completed);
|
||||
});
|
||||
_scanned.push_back(std::move(_buf));
|
||||
completed += nr_found;
|
||||
return read_until_part(limit, eol, completed);
|
||||
}
|
||||
}
|
||||
|
||||
template <typename CharType>
|
||||
future<temporary_buffer<CharType>>
|
||||
input_stream<CharType>::read_until(size_t limit, CharType eol) {
|
||||
return read_until_part(limit, eol, allocate(possibly_available()), 0);
|
||||
if (limit == 0) {
|
||||
return make_ready_future<tmp_buf>(tmp_buf(0));
|
||||
}
|
||||
return read_until_part(limit, eol, 0);
|
||||
}
|
||||
|
||||
#include <iostream>
|
||||
|
||||
Reference in New Issue
Block a user