Move input_stream and output_stream to their own header

This commit is contained in:
Avi Kivity
2015-02-23 10:15:19 +02:00
parent c35b61bda5
commit fbf8844079
4 changed files with 387 additions and 313 deletions

224
core/iostream-impl.hh Normal file
View File

@@ -0,0 +1,224 @@
/*
* This file is open source software, licensed to you under the terms
* of the Apache License, Version 2.0 (the "License"). See the NOTICE file
* distributed with this work for additional information regarding copyright
* ownership. You may not use this file except in compliance with the License.
*
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/*
* Copyright (C) 2015 Cloudius Systems, Ltd.
*/
#pragma once
#include "net/packet.hh"
template<typename CharType>
inline
future<> output_stream<CharType>::write(const char_type* buf) {
return write(buf, strlen(buf));
}
template<typename CharType>
inline
future<> output_stream<CharType>::write(const sstring& s) {
return write(s.c_str(), s.size());
}
template<typename CharType>
future<> output_stream<CharType>::write(scattered_message<CharType> msg) {
return write(std::move(msg).release());
}
template<typename CharType>
future<> output_stream<CharType>::write(net::packet p) {
static_assert(std::is_same<CharType, char>::value, "packet works on char");
if (p.len() == 0) {
return make_ready_future<>();
}
assert(!_end && "Mixing buffered writes and zero-copy writes not supported yet");
if (!_trim_to_size || p.len() <= _size) {
// TODO: aggregate buffers for later coalescing. Currently we flush right
// after appending the message anyway, so it doesn't matter.
return _fd.put(std::move(p));
}
auto head = p.share(0, _size);
p.trim_front(_size);
return _fd.put(std::move(head)).then([this, p = std::move(p)] () mutable {
return write(std::move(p));
});
}
template <typename CharType>
future<temporary_buffer<CharType>>
input_stream<CharType>::read_exactly_part(size_t n, tmp_buf out, size_t completed) {
if (available()) {
auto now = std::min(n - completed, available());
std::copy(_buf.get(), _buf.get() + now, out.get_write() + completed);
_buf.trim_front(now);
completed += now;
}
if (completed == n) {
return make_ready_future<tmp_buf>(std::move(out));
}
// _buf is now empty
return _fd.get().then([this, n, out = std::move(out), completed] (auto buf) mutable {
if (buf.size() == 0) {
return make_ready_future<tmp_buf>(std::move(buf));
}
_buf = std::move(buf);
return this->read_exactly_part(n, std::move(out), completed);
});
}
template <typename CharType>
future<temporary_buffer<CharType>>
input_stream<CharType>::read_exactly(size_t n) {
if (_buf.size() == n) {
// easy case: steal buffer, return to caller
return make_ready_future<tmp_buf>(std::move(_buf));
} else if (_buf.size() > n) {
// buffer large enough, share it with caller
auto front = _buf.share(0, n);
_buf.trim_front(n);
return make_ready_future<tmp_buf>(std::move(front));
} else if (_buf.size() == 0) {
// buffer is empty: grab one and retry
return _fd.get().then([this, n] (auto buf) mutable {
if (buf.size() == 0) {
return make_ready_future<tmp_buf>(std::move(buf));
}
_buf = std::move(buf);
return this->read_exactly(n);
});
} else {
// buffer too small: start copy/read loop
tmp_buf b(n);
return read_exactly_part(n, std::move(b), 0);
}
}
template <typename CharType>
template <typename Consumer>
future<>
input_stream<CharType>::consume(Consumer& consumer) {
if (_buf.empty() && !_eof) {
return _fd.get().then([this, &consumer] (tmp_buf buf) {
_buf = std::move(buf);
_eof = _buf.empty();
return consume(consumer);
});
} else {
auto tmp = std::move(_buf);
bool done = tmp.empty();
consumer(std::move(tmp), [this, &done] (tmp_buf unconsumed) {
done = true;
if (!unconsumed.empty()) {
_buf = std::move(unconsumed);
}
});
if (!done) {
return consume(consumer);
} else {
return make_ready_future<>();
}
}
}
// Writes @buf in chunks of _size length. The last chunk is buffered if smaller.
template <typename CharType>
future<>
output_stream<CharType>::split_and_put(temporary_buffer<CharType> buf) {
assert(_end == 0);
if (buf.size() < _size) {
if (!_buf) {
_buf = temporary_buffer<char>(_size);
}
std::copy(buf.get(), buf.get() + buf.size(), _buf.get_write());
_end = buf.size();
return make_ready_future<>();
}
auto chunk = buf.share(0, _size);
buf.trim_front(_size);
return _fd.put(std::move(chunk)).then([this, buf = std::move(buf)] () mutable {
return split_and_put(std::move(buf));
});
}
template <typename CharType>
future<>
output_stream<CharType>::write(const char_type* buf, size_t n) {
auto bulk_threshold = _end ? (2 * _size - _end) : _size;
if (n >= bulk_threshold) {
if (_end) {
auto now = _size - _end;
std::copy(buf, buf + now, _buf.get_write() + _end);
_end = _size;
temporary_buffer<char> tmp(n - now);
std::copy(buf + now, buf + n, tmp.get_write());
return flush().then([this, tmp = std::move(tmp)]() mutable {
if (_trim_to_size) {
return split_and_put(std::move(tmp));
} else {
return _fd.put(std::move(tmp));
}
});
} else {
temporary_buffer<char> tmp(n);
std::copy(buf, buf + n, tmp.get_write());
if (_trim_to_size) {
return split_and_put(std::move(tmp));
} else {
return _fd.put(std::move(tmp));
}
}
}
if (!_buf) {
_buf = temporary_buffer<char>(_size);
}
auto now = std::min(n, _size - _end);
std::copy(buf, buf + now, _buf.get_write() + _end);
_end += now;
if (now == n) {
return make_ready_future<>();
} else {
temporary_buffer<CharType> next(_size);
std::copy(buf + now, buf + n, next.get_write());
_end = n - now;
std::swap(next, _buf);
return _fd.put(std::move(next));
}
}
template <typename CharType>
future<>
output_stream<CharType>::flush() {
if (!_end) {
return make_ready_future<>();
}
_buf.trim(_end);
_end = 0;
return _fd.put(std::move(_buf));
}

161
core/iostream.hh Normal file
View File

@@ -0,0 +1,161 @@
/*
* This file is open source software, licensed to you under the terms
* of the Apache License, Version 2.0 (the "License"). See the NOTICE file
* distributed with this work for additional information regarding copyright
* ownership. You may not use this file except in compliance with the License.
*
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/*
* Copyright (C) 2015 Cloudius Systems, Ltd.
*/
//
// Buffered input and output streams
//
// Two abstract classes (data_source and data_sink) provide means
// to acquire bulk data from, or push bulk data to, some provider.
// These could be tied to a TCP connection, a disk file, or a memory
// buffer.
//
// Two concrete classes (input_stream and output_stream) buffer data
// from data_source and data_sink and provide easier means to process
// it.
//
#pragma once
#include "future.hh"
#include "temporary_buffer.hh"
#include "scattered_message.hh"
namespace net { class packet; }
class data_source_impl {
public:
virtual ~data_source_impl() {}
virtual future<temporary_buffer<char>> get() = 0;
};
class data_source {
std::unique_ptr<data_source_impl> _dsi;
protected:
data_source_impl* impl() const { return _dsi.get(); }
public:
explicit data_source(std::unique_ptr<data_source_impl> dsi) : _dsi(std::move(dsi)) {}
data_source(data_source&& x) = default;
future<temporary_buffer<char>> get() { return _dsi->get(); }
};
class data_sink_impl {
public:
virtual ~data_sink_impl() {}
virtual future<> put(net::packet data) = 0;
virtual future<> put(std::vector<temporary_buffer<char>> data) {
net::packet p;
p.reserve(data.size());
for (auto& buf : data) {
p = net::packet(std::move(p), net::fragment{buf.get_write(), buf.size()}, buf.release());
}
return put(std::move(p));
}
virtual future<> put(temporary_buffer<char> buf) {
return put(net::packet(net::fragment{buf.get_write(), buf.size()}, buf.release()));
}
virtual future<> close() = 0;
};
class data_sink {
std::unique_ptr<data_sink_impl> _dsi;
public:
explicit data_sink(std::unique_ptr<data_sink_impl> dsi) : _dsi(std::move(dsi)) {}
data_sink(data_sink&& x) = default;
future<> put(std::vector<temporary_buffer<char>> data) {
return _dsi->put(std::move(data));
}
future<> put(temporary_buffer<char> data) {
return _dsi->put(std::move(data));
}
future<> put(net::packet p) {
return _dsi->put(std::move(p));
}
future<> close() { return _dsi->close(); }
};
template <typename CharType>
class input_stream {
static_assert(sizeof(CharType) == 1, "must buffer stream of bytes");
data_source _fd;
temporary_buffer<CharType> _buf;
bool _eof = false;
private:
using tmp_buf = temporary_buffer<CharType>;
size_t available() const { return _buf.size(); }
protected:
void reset() { _buf = {}; }
data_source* fd() { return &_fd; }
public:
// Consumer concept, for consume() method:
struct ConsumerConcept {
// call done(tmp_buf) to signal end of processing. tmp_buf parameter to
// done is unconsumed data
template <typename Done>
void operator()(tmp_buf data, Done done);
};
using char_type = CharType;
explicit input_stream(data_source fd, size_t buf_size = 8192) : _fd(std::move(fd)), _buf(0) {}
future<temporary_buffer<CharType>> read_exactly(size_t n);
template <typename Consumer>
future<> consume(Consumer& c);
bool eof() { return _eof; }
private:
future<temporary_buffer<CharType>> read_exactly_part(size_t n, tmp_buf buf, size_t completed);
};
// Facilitates data buffering before it's handed over to data_sink.
//
// When trim_to_size is true it's guaranteed that data sink will not receive
// chunks larger than the configured size, which could be the case when a
// single write call is made with data larger than the configured size.
//
// The data sink will not receive empty chunks.
//
template <typename CharType>
class output_stream {
static_assert(sizeof(CharType) == 1, "must buffer stream of bytes");
data_sink _fd;
temporary_buffer<CharType> _buf;
size_t _size;
size_t _begin = 0;
size_t _end = 0;
bool _trim_to_size;
private:
size_t available() const { return _end - _begin; }
size_t possibly_available() const { return _size - _begin; }
future<> split_and_put(temporary_buffer<CharType> buf);
public:
using char_type = CharType;
output_stream(data_sink fd, size_t size, bool trim_to_size = false)
: _fd(std::move(fd)), _size(size), _trim_to_size(trim_to_size) {}
future<> write(const char_type* buf, size_t n);
future<> write(const char_type* buf);
future<> write(const sstring& s);
future<> write(net::packet p);
future<> write(scattered_message<char_type> msg);
future<> flush();
future<> close() { return _fd.close(); }
private:
};
#include "iostream-impl.hh"

View File

@@ -22,6 +22,7 @@
#ifndef REACTOR_HH_
#define REACTOR_HH_
#include "iostream.hh"
#include <memory>
#include <type_traits>
#include <libaio.h>
@@ -910,162 +911,6 @@ pollable_fd_state::~pollable_fd_state() {
engine().forget(*this);
}
class data_source_impl {
public:
virtual ~data_source_impl() {}
virtual future<temporary_buffer<char>> get() = 0;
};
class data_source {
std::unique_ptr<data_source_impl> _dsi;
protected:
data_source_impl* impl() const { return _dsi.get(); }
public:
explicit data_source(std::unique_ptr<data_source_impl> dsi) : _dsi(std::move(dsi)) {}
data_source(data_source&& x) = default;
future<temporary_buffer<char>> get() { return _dsi->get(); }
};
class data_sink_impl {
public:
virtual ~data_sink_impl() {}
virtual future<> put(net::packet data) = 0;
virtual future<> put(std::vector<temporary_buffer<char>> data) {
net::packet p;
p.reserve(data.size());
for (auto& buf : data) {
p = net::packet(std::move(p), net::fragment{buf.get_write(), buf.size()}, buf.release());
}
return put(std::move(p));
}
virtual future<> put(temporary_buffer<char> buf) {
return put(net::packet(net::fragment{buf.get_write(), buf.size()}, buf.release()));
}
virtual future<> close() = 0;
};
class data_sink {
std::unique_ptr<data_sink_impl> _dsi;
public:
explicit data_sink(std::unique_ptr<data_sink_impl> dsi) : _dsi(std::move(dsi)) {}
data_sink(data_sink&& x) = default;
future<> put(std::vector<temporary_buffer<char>> data) {
return _dsi->put(std::move(data));
}
future<> put(temporary_buffer<char> data) {
return _dsi->put(std::move(data));
}
future<> put(net::packet p) {
return _dsi->put(std::move(p));
}
future<> close() { return _dsi->close(); }
};
template <typename CharType>
class input_stream {
static_assert(sizeof(CharType) == 1, "must buffer stream of bytes");
data_source _fd;
temporary_buffer<CharType> _buf;
bool _eof = false;
private:
using tmp_buf = temporary_buffer<CharType>;
size_t available() const { return _buf.size(); }
protected:
void reset() { _buf = {}; }
data_source* fd() { return &_fd; }
public:
// Consumer concept, for consume() method:
struct ConsumerConcept {
// call done(tmp_buf) to signal end of processing. tmp_buf parameter to
// done is unconsumed data
template <typename Done>
void operator()(tmp_buf data, Done done);
};
using char_type = CharType;
explicit input_stream(data_source fd, size_t buf_size = 8192) : _fd(std::move(fd)), _buf(0) {}
future<temporary_buffer<CharType>> read_exactly(size_t n);
template <typename Consumer>
future<> consume(Consumer& c);
bool eof() { return _eof; }
private:
future<temporary_buffer<CharType>> read_exactly_part(size_t n, tmp_buf buf, size_t completed);
};
// Facilitates data buffering before it's handed over to data_sink.
//
// When trim_to_size is true it's guaranteed that data sink will not receive
// chunks larger than the configured size, which could be the case when a
// single write call is made with data larger than the configured size.
//
// The data sink will not receive empty chunks.
//
template <typename CharType>
class output_stream {
static_assert(sizeof(CharType) == 1, "must buffer stream of bytes");
data_sink _fd;
temporary_buffer<CharType> _buf;
size_t _size;
size_t _begin = 0;
size_t _end = 0;
bool _trim_to_size;
private:
size_t available() const { return _end - _begin; }
size_t possibly_available() const { return _size - _begin; }
future<> split_and_put(temporary_buffer<CharType> buf);
public:
using char_type = CharType;
output_stream(data_sink fd, size_t size, bool trim_to_size = false)
: _fd(std::move(fd)), _size(size), _trim_to_size(trim_to_size) {}
future<> write(const char_type* buf, size_t n);
future<> write(const char_type* buf);
future<> write(const sstring& s);
future<> write(net::packet p);
future<> write(scattered_message<char_type> msg);
future<> flush();
future<> close() { return _fd.close(); }
private:
};
template<typename CharType>
inline
future<> output_stream<CharType>::write(const char_type* buf) {
return write(buf, strlen(buf));
}
template<typename CharType>
inline
future<> output_stream<CharType>::write(const sstring& s) {
return write(s.c_str(), s.size());
}
template<typename CharType>
future<> output_stream<CharType>::write(scattered_message<CharType> msg) {
return write(std::move(msg).release());
}
template<typename CharType>
future<> output_stream<CharType>::write(net::packet p) {
static_assert(std::is_same<CharType, char>::value, "packet works on char");
if (p.len() == 0) {
return make_ready_future<>();
}
assert(!_end && "Mixing buffered writes and zero-copy writes not supported yet");
if (!_trim_to_size || p.len() <= _size) {
// TODO: aggregate buffers for later coalescing. Currently we flush right
// after appending the message anyway, so it doesn't matter.
return _fd.put(std::move(p));
}
auto head = p.share(0, _size);
p.trim_front(_size);
return _fd.put(std::move(head)).then([this, p = std::move(p)] () mutable {
return write(std::move(p));
});
}
inline
size_t iovec_len(const iovec* begin, size_t len)
{
@@ -1178,166 +1023,9 @@ void reactor::complete_timers(T& timers, E& expired_timers, EnableFunc&& enable_
enable_fn();
}
template <typename CharType>
future<temporary_buffer<CharType>>
input_stream<CharType>::read_exactly_part(size_t n, tmp_buf out, size_t completed) {
if (available()) {
auto now = std::min(n - completed, available());
std::copy(_buf.get(), _buf.get() + now, out.get_write() + completed);
_buf.trim_front(now);
completed += now;
}
if (completed == n) {
return make_ready_future<tmp_buf>(std::move(out));
}
// _buf is now empty
return _fd.get().then([this, n, out = std::move(out), completed] (auto buf) mutable {
if (buf.size() == 0) {
return make_ready_future<tmp_buf>(std::move(buf));
}
_buf = std::move(buf);
return this->read_exactly_part(n, std::move(out), completed);
});
}
template <typename CharType>
future<temporary_buffer<CharType>>
input_stream<CharType>::read_exactly(size_t n) {
if (_buf.size() == n) {
// easy case: steal buffer, return to caller
return make_ready_future<tmp_buf>(std::move(_buf));
} else if (_buf.size() > n) {
// buffer large enough, share it with caller
auto front = _buf.share(0, n);
_buf.trim_front(n);
return make_ready_future<tmp_buf>(std::move(front));
} else if (_buf.size() == 0) {
// buffer is empty: grab one and retry
return _fd.get().then([this, n] (auto buf) mutable {
if (buf.size() == 0) {
return make_ready_future<tmp_buf>(std::move(buf));
}
_buf = std::move(buf);
return this->read_exactly(n);
});
} else {
// buffer too small: start copy/read loop
tmp_buf b(n);
return read_exactly_part(n, std::move(b), 0);
}
}
template <typename CharType>
template <typename Consumer>
future<>
input_stream<CharType>::consume(Consumer& consumer) {
if (_buf.empty() && !_eof) {
return _fd.get().then([this, &consumer] (tmp_buf buf) {
_buf = std::move(buf);
_eof = _buf.empty();
return consume(consumer);
});
} else {
auto tmp = std::move(_buf);
bool done = tmp.empty();
consumer(std::move(tmp), [this, &done] (tmp_buf unconsumed) {
done = true;
if (!unconsumed.empty()) {
_buf = std::move(unconsumed);
}
});
if (!done) {
return consume(consumer);
} else {
return make_ready_future<>();
}
}
}
#include <iostream>
#include "sstring.hh"
// Writes @buf in chunks of _size length. The last chunk is buffered if smaller.
template <typename CharType>
future<>
output_stream<CharType>::split_and_put(temporary_buffer<CharType> buf) {
assert(_end == 0);
if (buf.size() < _size) {
if (!_buf) {
_buf = temporary_buffer<char>(_size);
}
std::copy(buf.get(), buf.get() + buf.size(), _buf.get_write());
_end = buf.size();
return make_ready_future<>();
}
auto chunk = buf.share(0, _size);
buf.trim_front(_size);
return _fd.put(std::move(chunk)).then([this, buf = std::move(buf)] () mutable {
return split_and_put(std::move(buf));
});
}
template <typename CharType>
future<>
output_stream<CharType>::write(const char_type* buf, size_t n) {
auto bulk_threshold = _end ? (2 * _size - _end) : _size;
if (n >= bulk_threshold) {
if (_end) {
auto now = _size - _end;
std::copy(buf, buf + now, _buf.get_write() + _end);
_end = _size;
temporary_buffer<char> tmp(n - now);
std::copy(buf + now, buf + n, tmp.get_write());
return flush().then([this, tmp = std::move(tmp)]() mutable {
if (_trim_to_size) {
return split_and_put(std::move(tmp));
} else {
return _fd.put(std::move(tmp));
}
});
} else {
temporary_buffer<char> tmp(n);
std::copy(buf, buf + n, tmp.get_write());
if (_trim_to_size) {
return split_and_put(std::move(tmp));
} else {
return _fd.put(std::move(tmp));
}
}
}
if (!_buf) {
_buf = temporary_buffer<char>(_size);
}
auto now = std::min(n, _size - _end);
std::copy(buf, buf + now, _buf.get_write() + _end);
_end += now;
if (now == n) {
return make_ready_future<>();
} else {
temporary_buffer<CharType> next(_size);
std::copy(buf + now, buf + n, next.get_write());
_end = n - now;
std::swap(next, _buf);
return _fd.put(std::move(next));
}
}
template <typename CharType>
future<>
output_stream<CharType>::flush() {
if (!_end) {
return make_ready_future<>();
}
_buf.trim(_end);
_end = 0;
return _fd.put(std::move(_buf));
}
inline
future<size_t> pollable_fd::read_some(char* buffer, size_t size) {
return engine().read_some(*_s, buffer, size);

View File

@@ -26,6 +26,7 @@
#include "core/deleter.hh"
#include "core/temporary_buffer.hh"
#include "net/packet.hh"
#include "sstring.hh"
#include <memory>
#include <vector>