mirror of
https://github.com/scylladb/scylladb.git
synced 2026-04-24 18:40:38 +00:00
Merge "file::close() support"
close() is a blocking call, so it must be called in the I/O thread, not the main reactor thread. To do that, we need a file::close() method that can return a future.
This commit is contained in:
21
core/file.hh
21
core/file.hh
@@ -64,6 +64,7 @@ public:
|
||||
virtual future<> discard(uint64_t offset, uint64_t length) = 0;
|
||||
virtual future<> allocate(uint64_t position, uint64_t length) = 0;
|
||||
virtual future<size_t> size(void) = 0;
|
||||
virtual future<> close() = 0;
|
||||
virtual subscription<directory_entry> list_directory(std::function<future<> (directory_entry de)> next) = 0;
|
||||
|
||||
friend class reactor;
|
||||
@@ -73,12 +74,7 @@ class posix_file_impl : public file_impl {
|
||||
public:
|
||||
int _fd;
|
||||
posix_file_impl(int fd) : _fd(fd) {}
|
||||
~posix_file_impl() {
|
||||
if (_fd != -1) {
|
||||
::close(_fd);
|
||||
}
|
||||
}
|
||||
|
||||
virtual ~posix_file_impl() override;
|
||||
future<size_t> write_dma(uint64_t pos, const void* buffer, size_t len);
|
||||
future<size_t> write_dma(uint64_t pos, std::vector<iovec> iov);
|
||||
future<size_t> read_dma(uint64_t pos, void* buffer, size_t len);
|
||||
@@ -89,6 +85,7 @@ public:
|
||||
future<> discard(uint64_t offset, uint64_t length);
|
||||
virtual future<> allocate(uint64_t position, uint64_t length) override;
|
||||
future<size_t> size(void);
|
||||
virtual future<> close() override;
|
||||
virtual subscription<directory_entry> list_directory(std::function<future<> (directory_entry de)> next) override;
|
||||
};
|
||||
|
||||
@@ -254,6 +251,18 @@ public:
|
||||
return _file_impl->size();
|
||||
}
|
||||
|
||||
/// Closes the file.
|
||||
///
|
||||
/// Flushes any pending operations and release any resources associated with
|
||||
/// the file (except for stable storage).
|
||||
///
|
||||
/// \note
|
||||
/// to ensure file data reaches stable storage, you must call \ref flush()
|
||||
/// before calling \c close().
|
||||
future<> close() {
|
||||
return _file_impl->close();
|
||||
}
|
||||
|
||||
subscription<directory_entry> list_directory(std::function<future<> (directory_entry de)> next) {
|
||||
return _file_impl->list_directory(std::move(next));
|
||||
}
|
||||
|
||||
@@ -128,7 +128,11 @@ public:
|
||||
});
|
||||
});
|
||||
}
|
||||
future<> close() { return _file->flush(); }
|
||||
future<> close() {
|
||||
return _file->flush().then([this] {
|
||||
return _file->close();
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
class file_data_sink : public data_sink {
|
||||
|
||||
@@ -471,6 +471,17 @@ bool reactor::process_io()
|
||||
return n;
|
||||
}
|
||||
|
||||
posix_file_impl::~posix_file_impl() {
|
||||
if (_fd != -1) {
|
||||
if (std::uncaught_exception()) {
|
||||
std::cerr << "WARNING: closing file in reactor thread during exception recovery\n";
|
||||
} else {
|
||||
std::cerr << "WARNING: closing file in reactor thread\n";
|
||||
}
|
||||
::close(_fd);
|
||||
}
|
||||
}
|
||||
|
||||
future<size_t>
|
||||
posix_file_impl::write_dma(uint64_t pos, const void* buffer, size_t len) {
|
||||
return engine().submit_io_write([this, pos, buffer, len] (iocb& io) {
|
||||
@@ -696,6 +707,16 @@ posix_file_impl::size(void) {
|
||||
});
|
||||
}
|
||||
|
||||
future<>
|
||||
posix_file_impl::close() {
|
||||
return engine()._thread_pool.submit<syscall_result<int>>([fd = _fd] {
|
||||
return wrap_syscall<int>(::close(fd));
|
||||
}).then([this] (syscall_result<int> sr) {
|
||||
_fd = -1;
|
||||
sr.throw_if_error();
|
||||
});
|
||||
}
|
||||
|
||||
future<size_t>
|
||||
blockdev_file_impl::size(void) {
|
||||
return engine()._thread_pool.submit<syscall_result_extra<size_t>>([this] {
|
||||
|
||||
@@ -59,6 +59,8 @@ SEASTAR_TEST_CASE(test1) {
|
||||
}
|
||||
return ft->sem.wait(max).then([ft] () mutable {
|
||||
return ft->f.flush();
|
||||
}).then([ft] {
|
||||
return ft->f.close();
|
||||
}).then([ft] () mutable {
|
||||
std::cout << "done\n";
|
||||
delete ft;
|
||||
|
||||
Reference in New Issue
Block a user