diff --git a/core/fstream.cc b/core/fstream.cc index 32580b0e83..5af5592289 100644 --- a/core/fstream.cc +++ b/core/fstream.cc @@ -20,7 +20,9 @@ */ #include "fstream.hh" +#include "align.hh" #include +#include class file_data_source_impl : public data_source_impl { lw_shared_ptr _file; @@ -63,3 +65,56 @@ input_stream make_file_input_stream( lw_shared_ptr f, uint64_t offset, size_t buffer_size) { return input_stream(file_data_source(std::move(f), offset, buffer_size)); } + +class file_data_sink_impl : public data_sink_impl { + lw_shared_ptr _file; + size_t _buffer_size; + uint64_t _pos = 0; +public: + file_data_sink_impl(lw_shared_ptr f, size_t buffer_size) + : _file(std::move(f)), _buffer_size(buffer_size) {} + future<> put(net::packet data) { return make_ready_future<>(); } + virtual temporary_buffer allocate_buffer(size_t size) override { + // buffers to dma_write must be aligned to 512 bytes. + return temporary_buffer::aligned(512, size); + } + virtual future<> put(temporary_buffer buf) override { + bool truncate = false; + auto pos = _pos; + _pos += buf.size(); + auto p = static_cast(buf.get()); + size_t buf_size = buf.size(); + + if ((buf.size() & 511) != 0) { + // If buf size isn't aligned, copy its content into a new aligned buf. + // This should only happen when the user calls output_stream::flush(). + auto tmp = allocate_buffer(align_up(buf.size(), 512UL)); + ::memcpy(tmp.get_write(), buf.get(), buf.size()); + buf = std::move(tmp); + p = buf.get(); + buf_size = buf.size(); + truncate = true; + } + return _file->dma_write(pos, p, buf_size).then( + [this, buf = std::move(buf), truncate] (size_t size) { + if (truncate) { + return _file->truncate(_pos).then([this] { + return _file->flush(); + }); + } + return make_ready_future<>(); + }); + } + future<> close() { return make_ready_future<>(); } +}; + +class file_data_sink : public data_sink { +public: + file_data_sink(lw_shared_ptr f, size_t buffer_size) + : data_sink(std::make_unique( + std::move(f), buffer_size)) {} +}; + +output_stream make_file_output_stream(lw_shared_ptr f, size_t buffer_size) { + return output_stream(file_data_sink(std::move(f), buffer_size), buffer_size); +} diff --git a/core/fstream.hh b/core/fstream.hh index f0590b8022..44964deb74 100644 --- a/core/fstream.hh +++ b/core/fstream.hh @@ -38,3 +38,10 @@ input_stream make_file_input_stream( lw_shared_ptr file, uint64_t offset = 0, uint64_t buffer_size = 8192); + +// Create an output_stream for reading starting at the position zero of a +// newly created file. +// NOTE: flush() should be the last thing to be called on a file output stream. +output_stream make_file_output_stream( + lw_shared_ptr file, + uint64_t buffer_size = 8192); diff --git a/core/iostream.hh b/core/iostream.hh index 3f000d4939..b0785c4083 100644 --- a/core/iostream.hh +++ b/core/iostream.hh @@ -144,7 +144,7 @@ private: // The data sink will not receive empty chunks. // template -class output_stream { +class output_stream final { static_assert(sizeof(CharType) == 1, "must buffer stream of bytes"); data_sink _fd; temporary_buffer _buf;