mirror of
https://github.com/scylladb/scylladb.git
synced 2026-04-24 18:40:38 +00:00
Merge seastar upstream
This commit is contained in:
19
core/file.hh
19
core/file.hh
@@ -62,6 +62,7 @@ public:
|
||||
virtual future<struct stat> stat(void) = 0;
|
||||
virtual future<> truncate(uint64_t length) = 0;
|
||||
virtual future<> discard(uint64_t offset, uint64_t length) = 0;
|
||||
virtual future<> allocate(uint64_t position, uint64_t length) = 0;
|
||||
virtual future<size_t> size(void) = 0;
|
||||
virtual subscription<directory_entry> list_directory(std::function<future<> (directory_entry de)> next) = 0;
|
||||
|
||||
@@ -86,6 +87,7 @@ public:
|
||||
future<struct stat> stat(void);
|
||||
future<> truncate(uint64_t length);
|
||||
future<> discard(uint64_t offset, uint64_t length);
|
||||
virtual future<> allocate(uint64_t position, uint64_t length) override;
|
||||
future<size_t> size(void);
|
||||
virtual subscription<directory_entry> list_directory(std::function<future<> (directory_entry de)> next) override;
|
||||
};
|
||||
@@ -96,6 +98,7 @@ public:
|
||||
future<> truncate(uint64_t length) override;
|
||||
future<> discard(uint64_t offset, uint64_t length) override;
|
||||
future<size_t> size(void) override;
|
||||
virtual future<> allocate(uint64_t position, uint64_t length) override;
|
||||
};
|
||||
|
||||
inline
|
||||
@@ -228,6 +231,22 @@ public:
|
||||
return _file_impl->truncate(length);
|
||||
}
|
||||
|
||||
/// Preallocate disk blocks for a specified byte range.
|
||||
///
|
||||
/// Requests the file system to allocate disk blocks to
|
||||
/// back the specified range (\c length bytes starting at
|
||||
/// \c position). The range may be outside the current file
|
||||
/// size; the blocks can then be used when appending to the
|
||||
/// file.
|
||||
///
|
||||
/// \param position beginning of the range at which to allocate
|
||||
/// blocks.
|
||||
/// \parm length length of range to allocate.
|
||||
/// \return future that becomes ready when the operation completes.
|
||||
future<> allocate(uint64_t position, uint64_t length) {
|
||||
return _file_impl->allocate(position, length);
|
||||
}
|
||||
|
||||
future<> discard(uint64_t offset, uint64_t length) {
|
||||
return _file_impl->discard(offset, length);
|
||||
}
|
||||
|
||||
@@ -81,11 +81,12 @@ input_stream<char> make_file_input_stream(
|
||||
|
||||
class file_data_sink_impl : public data_sink_impl {
|
||||
lw_shared_ptr<file> _file;
|
||||
size_t _buffer_size;
|
||||
file_output_stream_options _options;
|
||||
uint64_t _pos = 0;
|
||||
uint64_t _last_preallocation = 0;
|
||||
public:
|
||||
file_data_sink_impl(lw_shared_ptr<file> f, size_t buffer_size)
|
||||
: _file(std::move(f)), _buffer_size(buffer_size) {}
|
||||
file_data_sink_impl(lw_shared_ptr<file> f, file_output_stream_options options)
|
||||
: _file(std::move(f)), _options(options) {}
|
||||
future<> put(net::packet data) { return make_ready_future<>(); }
|
||||
virtual temporary_buffer<char> allocate_buffer(size_t size) override {
|
||||
// buffers to dma_write must be aligned to 512 bytes.
|
||||
@@ -108,12 +109,20 @@ public:
|
||||
buf_size = buf.size();
|
||||
truncate = true;
|
||||
}
|
||||
return _file->dma_write(pos, p, buf_size).then(
|
||||
[this, buf = std::move(buf), truncate] (size_t size) {
|
||||
if (truncate) {
|
||||
return _file->truncate(_pos);
|
||||
}
|
||||
return make_ready_future<>();
|
||||
auto prealloc = make_ready_future<>();
|
||||
if (pos + buf_size > _last_preallocation) {
|
||||
auto old = _last_preallocation;
|
||||
_last_preallocation = align_down<uint64_t>(old + _options.preallocation_size, _options.preallocation_size);
|
||||
prealloc = _file->allocate(old, _last_preallocation - old);
|
||||
}
|
||||
return prealloc.then([this, pos, p, buf_size, truncate, buf = std::move(buf)] () mutable {
|
||||
_file->dma_write(pos, p, buf_size).then(
|
||||
[this, buf = std::move(buf), truncate] (size_t size) {
|
||||
if (truncate) {
|
||||
return _file->truncate(_pos);
|
||||
}
|
||||
return make_ready_future<>();
|
||||
});
|
||||
});
|
||||
}
|
||||
future<> close() { return _file->flush(); }
|
||||
@@ -121,11 +130,18 @@ public:
|
||||
|
||||
class file_data_sink : public data_sink {
|
||||
public:
|
||||
file_data_sink(lw_shared_ptr<file> f, size_t buffer_size)
|
||||
file_data_sink(lw_shared_ptr<file> f, file_output_stream_options options)
|
||||
: data_sink(std::make_unique<file_data_sink_impl>(
|
||||
std::move(f), buffer_size)) {}
|
||||
std::move(f), options)) {}
|
||||
};
|
||||
|
||||
output_stream<char> make_file_output_stream(lw_shared_ptr<file> f, size_t buffer_size) {
|
||||
return output_stream<char>(file_data_sink(std::move(f), buffer_size), buffer_size);
|
||||
file_output_stream_options options;
|
||||
options.buffer_size = buffer_size;
|
||||
return make_file_output_stream(std::move(f), options);
|
||||
}
|
||||
|
||||
output_stream<char> make_file_output_stream(lw_shared_ptr<file> f, file_output_stream_options options) {
|
||||
return output_stream<char>(file_data_sink(std::move(f), options), options.buffer_size);
|
||||
}
|
||||
|
||||
|
||||
@@ -39,9 +39,22 @@ input_stream<char> make_file_input_stream(
|
||||
lw_shared_ptr<file> file, uint64_t offset = 0,
|
||||
uint64_t buffer_size = 8192);
|
||||
|
||||
struct file_output_stream_options {
|
||||
unsigned buffer_size = 8192;
|
||||
unsigned preallocation_size = 1024*1024; // 1MB
|
||||
};
|
||||
|
||||
// Create an output_stream for writing starting at the position zero of a
|
||||
// newly created file.
|
||||
// NOTE: flush() should be the last thing to be called on a file output stream.
|
||||
output_stream<char> make_file_output_stream(
|
||||
lw_shared_ptr<file> file,
|
||||
uint64_t buffer_size = 8192);
|
||||
|
||||
/// Create an output_stream for writing starting at the position zero of a
|
||||
/// newly created file.
|
||||
/// NOTE: flush() should be the last thing to be called on a file output stream.
|
||||
output_stream<char> make_file_output_stream(
|
||||
lw_shared_ptr<file> file,
|
||||
file_output_stream_options options);
|
||||
|
||||
|
||||
@@ -571,6 +571,26 @@ posix_file_impl::discard(uint64_t offset, uint64_t length) {
|
||||
});
|
||||
}
|
||||
|
||||
future<>
|
||||
posix_file_impl::allocate(uint64_t position, uint64_t length) {
|
||||
// FALLOC_FL_ZERO_RANGE is fairly new, so don't fail if it's not supported.
|
||||
static bool supported = true;
|
||||
if (!supported) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
return engine()._thread_pool.submit<syscall_result<int>>([this, position, length] () mutable {
|
||||
auto ret = ::fallocate(_fd, FALLOC_FL_ZERO_RANGE|FALLOC_FL_KEEP_SIZE, position, length);
|
||||
if (ret == -1 && errno == EOPNOTSUPP) {
|
||||
ret = 0;
|
||||
supported = false; // Racy, but harmless. At most we issue an extra call or two.
|
||||
}
|
||||
return wrap_syscall<int>(ret);
|
||||
}).then([] (syscall_result<int> sr) {
|
||||
sr.throw_if_error();
|
||||
return make_ready_future<>();
|
||||
});
|
||||
}
|
||||
|
||||
future<>
|
||||
blockdev_file_impl::discard(uint64_t offset, uint64_t length) {
|
||||
return engine()._thread_pool.submit<syscall_result<int>>([this, offset, length] () mutable {
|
||||
@@ -582,6 +602,12 @@ blockdev_file_impl::discard(uint64_t offset, uint64_t length) {
|
||||
});
|
||||
}
|
||||
|
||||
future<>
|
||||
blockdev_file_impl::allocate(uint64_t position, uint64_t length) {
|
||||
// nothing to do for block device
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
future<size_t>
|
||||
posix_file_impl::size(void) {
|
||||
return posix_file_impl::stat().then([] (struct stat&& st) {
|
||||
|
||||
Reference in New Issue
Block a user