sstables: coroutinize remove_by_toc_name
Test: unit(dev) Signed-off-by: Benny Halevy <bhalevy@scylladb.com> Message-Id: <20220214140029.1513522-1-bhalevy@scylladb.com>
This commit is contained in:
@@ -2464,81 +2464,59 @@ fsync_directory(const io_error_handler& error_handler, sstring fname) {
|
||||
}
|
||||
|
||||
static future<>
|
||||
remove_by_toc_name(std::string_view sstable_toc_strview) noexcept {
|
||||
sstring sstable_toc_name, prefix, new_toc_name, dir;
|
||||
try {
|
||||
sstable_toc_name = sstring(sstable_toc_strview);
|
||||
prefix = sstable_toc_name.substr(0, sstable_toc_name.size() - sstable_version_constants::TOC_SUFFIX.size());
|
||||
new_toc_name = prefix + sstable_version_constants::TEMPORARY_TOC_SUFFIX;
|
||||
} catch (...) {
|
||||
return current_exception_as_future();
|
||||
}
|
||||
remove_by_toc_name(sstring sstable_toc_name) {
|
||||
auto dir = dirname(sstable_toc_name);
|
||||
sstring prefix = sstable_toc_name.substr(0, sstable_toc_name.size() - sstable_version_constants::TOC_SUFFIX.size());
|
||||
sstring new_toc_name = prefix + sstable_version_constants::TEMPORARY_TOC_SUFFIX;
|
||||
|
||||
return do_with(std::move(sstable_toc_name), std::move(prefix), std::move(new_toc_name), sstring(),
|
||||
[] (sstring& sstable_toc_name, sstring& prefix, sstring& new_toc_name, sstring& dir) {
|
||||
sstlog.debug("Removing by TOC name: {}", sstable_toc_name);
|
||||
return sstable_io_check(sstable_write_error_handler, file_exists, sstable_toc_name).then([&] (bool toc_exists) {
|
||||
if (toc_exists) {
|
||||
dir = dirname(sstable_toc_name);
|
||||
// If new_toc_name exists it will be atomically replaced. See rename(2)
|
||||
return sstable_io_check(sstable_write_error_handler, rename_file, sstable_toc_name, new_toc_name).then([&dir] {
|
||||
return fsync_directory(sstable_write_error_handler, dir);
|
||||
}).then([] {
|
||||
return make_ready_future<bool>(true);
|
||||
});
|
||||
} else {
|
||||
return sstable_io_check(sstable_write_error_handler, file_exists, new_toc_name);
|
||||
sstlog.debug("Removing by TOC name: {}", sstable_toc_name);
|
||||
if (co_await sstable_io_check(sstable_write_error_handler, file_exists, sstable_toc_name)) {
|
||||
// If new_toc_name exists it will be atomically replaced. See rename(2)
|
||||
co_await sstable_io_check(sstable_write_error_handler, rename_file, sstable_toc_name, new_toc_name);
|
||||
co_await fsync_directory(sstable_write_error_handler, dir);
|
||||
} else {
|
||||
if (!co_await sstable_io_check(sstable_write_error_handler, file_exists, new_toc_name)) {
|
||||
sstlog.warn("Unable to delete {} because it doesn't exist.", sstable_toc_name);
|
||||
co_return;
|
||||
}
|
||||
}
|
||||
auto toc_file = co_await open_checked_file_dma(sstable_write_error_handler, new_toc_name, open_flags::ro);
|
||||
auto in = make_file_input_stream(toc_file);
|
||||
std::vector<sstring> components;
|
||||
std::exception_ptr ex;
|
||||
try {
|
||||
auto size = co_await toc_file.size();
|
||||
auto text = co_await in.read_exactly(size);
|
||||
sstring all(text.begin(), text.end());
|
||||
boost::split(components, all, boost::is_any_of("\n"));
|
||||
} catch (...) {
|
||||
ex = std::current_exception();
|
||||
}
|
||||
co_await in.close();
|
||||
if (ex) {
|
||||
std::rethrow_exception(std::move(ex));
|
||||
}
|
||||
co_await parallel_for_each(components, [&prefix] (sstring component) -> future<> {
|
||||
if (component.empty()) {
|
||||
// eof
|
||||
co_return;
|
||||
}
|
||||
if (component == sstable_version_constants::TOC_SUFFIX) {
|
||||
// already renamed
|
||||
co_return;
|
||||
}
|
||||
auto fname = prefix + component;
|
||||
try {
|
||||
co_await sstable_io_check(sstable_write_error_handler, remove_file, fname);
|
||||
} catch (...) {
|
||||
if (!is_system_error_errno(ENOENT)) {
|
||||
throw;
|
||||
}
|
||||
}).then([&] (bool exists) {
|
||||
if (!exists) {
|
||||
sstlog.warn("Unable to delete {} because it doesn't exist.", sstable_toc_name);
|
||||
return make_ready_future<>();
|
||||
} else {
|
||||
dir = dirname(new_toc_name);
|
||||
}
|
||||
return with_file(open_checked_file_dma(sstable_write_error_handler, new_toc_name, open_flags::ro), [&] (file& toc_file) {
|
||||
return toc_file.size().then([&] (size_t size) {
|
||||
return do_with(make_file_input_stream(toc_file), [&, size] (input_stream<char>& in) {
|
||||
return in.read_exactly(size).then([&] (temporary_buffer<char> text) {
|
||||
std::vector<sstring> components;
|
||||
sstring all(text.begin(), text.end());
|
||||
boost::split(components, all, boost::is_any_of("\n"));
|
||||
return parallel_for_each(components, [&prefix] (sstring component) {
|
||||
if (component.empty()) {
|
||||
// eof
|
||||
return make_ready_future<>();
|
||||
}
|
||||
if (component == sstable_version_constants::TOC_SUFFIX) {
|
||||
// already renamed
|
||||
return make_ready_future<>();
|
||||
}
|
||||
auto fname = prefix + component;
|
||||
return sstable_io_check(sstable_write_error_handler, remove_file, fname).handle_exception([fname = std::move(fname)] (std::exception_ptr eptr) {
|
||||
// forgive ENOENT, since the component may not have been written;
|
||||
try {
|
||||
std::rethrow_exception(eptr);
|
||||
} catch (const std::system_error& e) {
|
||||
if (!is_system_error_errno(ENOENT)) {
|
||||
return make_exception_future<>(eptr);
|
||||
}
|
||||
sstlog.debug("Forgiving ENOENT when deleting file {}", fname);
|
||||
return make_ready_future<>();
|
||||
}
|
||||
__builtin_unreachable();
|
||||
});
|
||||
}).then([&dir] {
|
||||
return fsync_directory(sstable_write_error_handler, dir);
|
||||
}).then([&new_toc_name] {
|
||||
return sstable_io_check(sstable_write_error_handler, remove_file, new_toc_name);
|
||||
});
|
||||
}).finally([&in] () mutable {
|
||||
return in.close();
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
sstlog.debug("Forgiving ENOENT when deleting file {}", fname);
|
||||
}
|
||||
});
|
||||
co_await fsync_directory(sstable_write_error_handler, dir);
|
||||
co_await sstable_io_check(sstable_write_error_handler, remove_file, new_toc_name);
|
||||
}
|
||||
|
||||
future<>
|
||||
|
||||
Reference in New Issue
Block a user