sstables: coroutinize remove_by_toc_name

Test: unit(dev)

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
Message-Id: <20220214140029.1513522-1-bhalevy@scylladb.com>
This commit is contained in:
Benny Halevy
2022-02-14 16:00:28 +02:00
committed by Avi Kivity
parent 4e3038b57f
commit e5fc4b6f5d

View File

@@ -2464,81 +2464,59 @@ fsync_directory(const io_error_handler& error_handler, sstring fname) {
}
static future<>
remove_by_toc_name(std::string_view sstable_toc_strview) noexcept {
sstring sstable_toc_name, prefix, new_toc_name, dir;
try {
sstable_toc_name = sstring(sstable_toc_strview);
prefix = sstable_toc_name.substr(0, sstable_toc_name.size() - sstable_version_constants::TOC_SUFFIX.size());
new_toc_name = prefix + sstable_version_constants::TEMPORARY_TOC_SUFFIX;
} catch (...) {
return current_exception_as_future();
}
remove_by_toc_name(sstring sstable_toc_name) {
auto dir = dirname(sstable_toc_name);
sstring prefix = sstable_toc_name.substr(0, sstable_toc_name.size() - sstable_version_constants::TOC_SUFFIX.size());
sstring new_toc_name = prefix + sstable_version_constants::TEMPORARY_TOC_SUFFIX;
return do_with(std::move(sstable_toc_name), std::move(prefix), std::move(new_toc_name), sstring(),
[] (sstring& sstable_toc_name, sstring& prefix, sstring& new_toc_name, sstring& dir) {
sstlog.debug("Removing by TOC name: {}", sstable_toc_name);
return sstable_io_check(sstable_write_error_handler, file_exists, sstable_toc_name).then([&] (bool toc_exists) {
if (toc_exists) {
dir = dirname(sstable_toc_name);
// If new_toc_name exists it will be atomically replaced. See rename(2)
return sstable_io_check(sstable_write_error_handler, rename_file, sstable_toc_name, new_toc_name).then([&dir] {
return fsync_directory(sstable_write_error_handler, dir);
}).then([] {
return make_ready_future<bool>(true);
});
} else {
return sstable_io_check(sstable_write_error_handler, file_exists, new_toc_name);
sstlog.debug("Removing by TOC name: {}", sstable_toc_name);
if (co_await sstable_io_check(sstable_write_error_handler, file_exists, sstable_toc_name)) {
// If new_toc_name exists it will be atomically replaced. See rename(2)
co_await sstable_io_check(sstable_write_error_handler, rename_file, sstable_toc_name, new_toc_name);
co_await fsync_directory(sstable_write_error_handler, dir);
} else {
if (!co_await sstable_io_check(sstable_write_error_handler, file_exists, new_toc_name)) {
sstlog.warn("Unable to delete {} because it doesn't exist.", sstable_toc_name);
co_return;
}
}
auto toc_file = co_await open_checked_file_dma(sstable_write_error_handler, new_toc_name, open_flags::ro);
auto in = make_file_input_stream(toc_file);
std::vector<sstring> components;
std::exception_ptr ex;
try {
auto size = co_await toc_file.size();
auto text = co_await in.read_exactly(size);
sstring all(text.begin(), text.end());
boost::split(components, all, boost::is_any_of("\n"));
} catch (...) {
ex = std::current_exception();
}
co_await in.close();
if (ex) {
std::rethrow_exception(std::move(ex));
}
co_await parallel_for_each(components, [&prefix] (sstring component) -> future<> {
if (component.empty()) {
// eof
co_return;
}
if (component == sstable_version_constants::TOC_SUFFIX) {
// already renamed
co_return;
}
auto fname = prefix + component;
try {
co_await sstable_io_check(sstable_write_error_handler, remove_file, fname);
} catch (...) {
if (!is_system_error_errno(ENOENT)) {
throw;
}
}).then([&] (bool exists) {
if (!exists) {
sstlog.warn("Unable to delete {} because it doesn't exist.", sstable_toc_name);
return make_ready_future<>();
} else {
dir = dirname(new_toc_name);
}
return with_file(open_checked_file_dma(sstable_write_error_handler, new_toc_name, open_flags::ro), [&] (file& toc_file) {
return toc_file.size().then([&] (size_t size) {
return do_with(make_file_input_stream(toc_file), [&, size] (input_stream<char>& in) {
return in.read_exactly(size).then([&] (temporary_buffer<char> text) {
std::vector<sstring> components;
sstring all(text.begin(), text.end());
boost::split(components, all, boost::is_any_of("\n"));
return parallel_for_each(components, [&prefix] (sstring component) {
if (component.empty()) {
// eof
return make_ready_future<>();
}
if (component == sstable_version_constants::TOC_SUFFIX) {
// already renamed
return make_ready_future<>();
}
auto fname = prefix + component;
return sstable_io_check(sstable_write_error_handler, remove_file, fname).handle_exception([fname = std::move(fname)] (std::exception_ptr eptr) {
// forgive ENOENT, since the component may not have been written;
try {
std::rethrow_exception(eptr);
} catch (const std::system_error& e) {
if (!is_system_error_errno(ENOENT)) {
return make_exception_future<>(eptr);
}
sstlog.debug("Forgiving ENOENT when deleting file {}", fname);
return make_ready_future<>();
}
__builtin_unreachable();
});
}).then([&dir] {
return fsync_directory(sstable_write_error_handler, dir);
}).then([&new_toc_name] {
return sstable_io_check(sstable_write_error_handler, remove_file, new_toc_name);
});
}).finally([&in] () mutable {
return in.close();
});
});
});
});
});
sstlog.debug("Forgiving ENOENT when deleting file {}", fname);
}
});
co_await fsync_directory(sstable_write_error_handler, dir);
co_await sstable_io_check(sstable_write_error_handler, remove_file, new_toc_name);
}
future<>