diff --git a/core/file.hh b/core/file.hh index 66966e291d..e1ab3649ad 100644 --- a/core/file.hh +++ b/core/file.hh @@ -62,6 +62,7 @@ public: virtual future stat(void) = 0; virtual future<> truncate(uint64_t length) = 0; virtual future<> discard(uint64_t offset, uint64_t length) = 0; + virtual future<> allocate(uint64_t position, uint64_t length) = 0; virtual future size(void) = 0; virtual subscription list_directory(std::function (directory_entry de)> next) = 0; @@ -86,6 +87,7 @@ public: future stat(void); future<> truncate(uint64_t length); future<> discard(uint64_t offset, uint64_t length); + virtual future<> allocate(uint64_t position, uint64_t length) override; future size(void); virtual subscription list_directory(std::function (directory_entry de)> next) override; }; @@ -96,6 +98,7 @@ public: future<> truncate(uint64_t length) override; future<> discard(uint64_t offset, uint64_t length) override; future size(void) override; + virtual future<> allocate(uint64_t position, uint64_t length) override; }; inline @@ -228,6 +231,22 @@ public: return _file_impl->truncate(length); } + /// Preallocate disk blocks for a specified byte range. + /// + /// Requests the file system to allocate disk blocks to + /// back the specified range (\c length bytes starting at + /// \c position). The range may be outside the current file + /// size; the blocks can then be used when appending to the + /// file. + /// + /// \param position beginning of the range at which to allocate + /// blocks. + /// \parm length length of range to allocate. + /// \return future that becomes ready when the operation completes. + future<> allocate(uint64_t position, uint64_t length) { + return _file_impl->allocate(position, length); + } + future<> discard(uint64_t offset, uint64_t length) { return _file_impl->discard(offset, length); } diff --git a/core/fstream.cc b/core/fstream.cc index 103e34df54..bc7b05d7c1 100644 --- a/core/fstream.cc +++ b/core/fstream.cc @@ -81,11 +81,12 @@ input_stream make_file_input_stream( class file_data_sink_impl : public data_sink_impl { lw_shared_ptr _file; - size_t _buffer_size; + file_output_stream_options _options; uint64_t _pos = 0; + uint64_t _last_preallocation = 0; public: - file_data_sink_impl(lw_shared_ptr f, size_t buffer_size) - : _file(std::move(f)), _buffer_size(buffer_size) {} + file_data_sink_impl(lw_shared_ptr f, file_output_stream_options options) + : _file(std::move(f)), _options(options) {} future<> put(net::packet data) { return make_ready_future<>(); } virtual temporary_buffer allocate_buffer(size_t size) override { // buffers to dma_write must be aligned to 512 bytes. @@ -108,12 +109,20 @@ public: buf_size = buf.size(); truncate = true; } - return _file->dma_write(pos, p, buf_size).then( - [this, buf = std::move(buf), truncate] (size_t size) { - if (truncate) { - return _file->truncate(_pos); - } - return make_ready_future<>(); + auto prealloc = make_ready_future<>(); + if (pos + buf_size > _last_preallocation) { + auto old = _last_preallocation; + _last_preallocation = align_down(old + _options.preallocation_size, _options.preallocation_size); + prealloc = _file->allocate(old, _last_preallocation - old); + } + return prealloc.then([this, pos, p, buf_size, truncate, buf = std::move(buf)] () mutable { + _file->dma_write(pos, p, buf_size).then( + [this, buf = std::move(buf), truncate] (size_t size) { + if (truncate) { + return _file->truncate(_pos); + } + return make_ready_future<>(); + }); }); } future<> close() { return _file->flush(); } @@ -121,11 +130,18 @@ public: class file_data_sink : public data_sink { public: - file_data_sink(lw_shared_ptr f, size_t buffer_size) + file_data_sink(lw_shared_ptr f, file_output_stream_options options) : data_sink(std::make_unique( - std::move(f), buffer_size)) {} + std::move(f), options)) {} }; output_stream make_file_output_stream(lw_shared_ptr f, size_t buffer_size) { - return output_stream(file_data_sink(std::move(f), buffer_size), buffer_size); + file_output_stream_options options; + options.buffer_size = buffer_size; + return make_file_output_stream(std::move(f), options); } + +output_stream make_file_output_stream(lw_shared_ptr f, file_output_stream_options options) { + return output_stream(file_data_sink(std::move(f), options), options.buffer_size); +} + diff --git a/core/fstream.hh b/core/fstream.hh index 8942018f56..1fd62e49a8 100644 --- a/core/fstream.hh +++ b/core/fstream.hh @@ -39,9 +39,22 @@ input_stream make_file_input_stream( lw_shared_ptr file, uint64_t offset = 0, uint64_t buffer_size = 8192); +struct file_output_stream_options { + unsigned buffer_size = 8192; + unsigned preallocation_size = 1024*1024; // 1MB +}; + // Create an output_stream for writing starting at the position zero of a // newly created file. // NOTE: flush() should be the last thing to be called on a file output stream. output_stream make_file_output_stream( lw_shared_ptr file, uint64_t buffer_size = 8192); + +/// Create an output_stream for writing starting at the position zero of a +/// newly created file. +/// NOTE: flush() should be the last thing to be called on a file output stream. +output_stream make_file_output_stream( + lw_shared_ptr file, + file_output_stream_options options); + diff --git a/core/reactor.cc b/core/reactor.cc index 8e5992a2d0..e29dd8ea4e 100644 --- a/core/reactor.cc +++ b/core/reactor.cc @@ -571,6 +571,26 @@ posix_file_impl::discard(uint64_t offset, uint64_t length) { }); } +future<> +posix_file_impl::allocate(uint64_t position, uint64_t length) { + // FALLOC_FL_ZERO_RANGE is fairly new, so don't fail if it's not supported. + static bool supported = true; + if (!supported) { + return make_ready_future<>(); + } + return engine()._thread_pool.submit>([this, position, length] () mutable { + auto ret = ::fallocate(_fd, FALLOC_FL_ZERO_RANGE|FALLOC_FL_KEEP_SIZE, position, length); + if (ret == -1 && errno == EOPNOTSUPP) { + ret = 0; + supported = false; // Racy, but harmless. At most we issue an extra call or two. + } + return wrap_syscall(ret); + }).then([] (syscall_result sr) { + sr.throw_if_error(); + return make_ready_future<>(); + }); +} + future<> blockdev_file_impl::discard(uint64_t offset, uint64_t length) { return engine()._thread_pool.submit>([this, offset, length] () mutable { @@ -582,6 +602,12 @@ blockdev_file_impl::discard(uint64_t offset, uint64_t length) { }); } +future<> +blockdev_file_impl::allocate(uint64_t position, uint64_t length) { + // nothing to do for block device + return make_ready_future<>(); +} + future posix_file_impl::size(void) { return posix_file_impl::stat().then([] (struct stat&& st) {