Merge 'Remove sstables::remove_by_toc_name()' from Pavel Emelyanov
The helper in question complicates the logic of sstable_directory::process() by making garbage collection differently for sstables deleted "atomically" and deleted "one-by-one". Also, the code that deletes sstables one-by-one and uses remove_by_toc_name() renders excessive TOC file reading, because there's sstable object at hand and it had all_components() ready for use. Surprisingly, there was no test for the deletion-log functionality. This PR adds one. The test passes before the g.c. and regular unlink fix, and (of course) continues passing after it. Closes scylladb/scylladb#16240 * github.com:scylladb/scylladb: sstables: Drop remove_by_name() sstables/fs_storage: Wipe by recognized+unrecognized components sstable_directory: Enlight deletion log replay sstables: Split remove_by_toc_name() test: Add test case to validate deletion log work sstable_directory: Close dir on exception sstable_directory: Fix indentation after previous patch sstable_directory: Coroutinize delete_with_pending_deletion_log() test: Sstable on_delete() is not necessarily in a thread sstable_directory: Split delete_with_pending_deletion_log()
This commit is contained in:
@@ -545,8 +545,8 @@ bool sstable_directory::compare_sstable_storage_prefix(const sstring& prefix_a,
|
||||
return size_a == size_b && sstring::traits_type::compare(prefix_a.begin(), prefix_b.begin(), size_a) == 0;
|
||||
}
|
||||
|
||||
future<> sstable_directory::delete_with_pending_deletion_log(std::vector<shared_sstable> ssts) {
|
||||
return seastar::async([ssts = std::move(ssts)] {
|
||||
future<std::pair<sstring, sstring>> sstable_directory::create_pending_deletion_log(const std::vector<shared_sstable>& ssts) {
|
||||
return seastar::async([&ssts] {
|
||||
shared_sstable first = nullptr;
|
||||
min_max_tracker<generation_type> gen_tracker;
|
||||
|
||||
@@ -590,35 +590,41 @@ future<> sstable_directory::delete_with_pending_deletion_log(std::vector<shared_
|
||||
close_out.close_now();
|
||||
|
||||
auto dir_f = open_directory(pending_delete_dir).get0();
|
||||
auto close_dir = deferred_close(dir_f);
|
||||
// Once flushed and closed, the temporary log file can be renamed.
|
||||
rename_file(tmp_pending_delete_log, pending_delete_log).get();
|
||||
|
||||
// Guarantee that the changes above reached the disk.
|
||||
dir_f.flush().get();
|
||||
dir_f.close().get();
|
||||
close_dir.close_now();
|
||||
sstlog.debug("{} written successfully.", pending_delete_log);
|
||||
} catch (...) {
|
||||
sstlog.warn("Error while writing {}: {}. Ignoring.", pending_delete_log, std::current_exception());
|
||||
}
|
||||
|
||||
parallel_for_each(ssts, [] (shared_sstable sst) {
|
||||
return sst->unlink(sstables::storage::sync_dir::no);
|
||||
}).get();
|
||||
|
||||
sync_directory(first->_storage->prefix()).get();
|
||||
|
||||
// Once all sstables are deleted, the log file can be removed.
|
||||
// Note: the log file will be removed also if unlink failed to remove
|
||||
// any sstable and ignored the error.
|
||||
try {
|
||||
remove_file(pending_delete_log).get();
|
||||
sstlog.debug("{} removed.", pending_delete_log);
|
||||
} catch (...) {
|
||||
sstlog.warn("Error removing {}: {}. Ignoring.", pending_delete_log, std::current_exception());
|
||||
}
|
||||
return std::make_pair<sstring, sstring>(std::move(pending_delete_log), first->_storage->prefix());
|
||||
});
|
||||
}
|
||||
|
||||
future<> sstable_directory::delete_with_pending_deletion_log(std::vector<shared_sstable> ssts) {
|
||||
auto [ pending_delete_log, sst_directory] = co_await create_pending_deletion_log(ssts);
|
||||
|
||||
co_await coroutine::parallel_for_each(ssts, [] (shared_sstable sst) {
|
||||
return sst->unlink(sstables::storage::sync_dir::no);
|
||||
});
|
||||
co_await sync_directory(sst_directory);
|
||||
|
||||
// Once all sstables are deleted, the log file can be removed.
|
||||
// Note: the log file will be removed also if unlink failed to remove
|
||||
// any sstable and ignored the error.
|
||||
try {
|
||||
co_await remove_file(pending_delete_log);
|
||||
sstlog.debug("{} removed.", pending_delete_log);
|
||||
} catch (...) {
|
||||
sstlog.warn("Error removing {}: {}. Ignoring.", pending_delete_log, std::current_exception());
|
||||
}
|
||||
}
|
||||
|
||||
// FIXME: Go through maybe_delete_large_partitions_entry on recovery since
|
||||
// this is an indication we crashed in the middle of delete_with_pending_deletion_log
|
||||
future<> sstable_directory::filesystem_components_lister::replay_pending_delete_log(fs::path pending_delete_log) {
|
||||
@@ -633,7 +639,8 @@ future<> sstable_directory::filesystem_components_lister::replay_pending_delete_
|
||||
boost::split(basenames, all, boost::is_any_of("\n"), boost::token_compress_on);
|
||||
auto tocs = boost::copy_range<std::vector<sstring>>(basenames | boost::adaptors::filtered([] (auto&& basename) { return !basename.empty(); }));
|
||||
co_await parallel_for_each(tocs, [&sstdir] (const sstring& name) {
|
||||
return remove_by_toc_name(sstdir + "/" + name);
|
||||
// Only move TOC to TOC.tmp, the rest will be finished by regular process
|
||||
return make_toc_temporary(sstdir + "/" + name).discard_result();
|
||||
});
|
||||
sstlog.debug("Replayed {}, removing", pending_delete_log);
|
||||
co_await remove_file(pending_delete_log.native());
|
||||
|
||||
@@ -277,6 +277,10 @@ public:
|
||||
//
|
||||
// This function only solves the second problem for now.
|
||||
static future<> delete_with_pending_deletion_log(std::vector<shared_sstable> ssts);
|
||||
// Creates the deletion log for atomic deletion of sstables (helper for the
|
||||
// above function that's also used by tests)
|
||||
// Returns a pair of "logilfe name" and "directory with sstables"
|
||||
static future<std::pair<sstring, sstring>> create_pending_deletion_log(const std::vector<shared_sstable>& ssts);
|
||||
|
||||
static bool compare_sstable_storage_prefix(const sstring& a, const sstring& b) noexcept;
|
||||
};
|
||||
|
||||
@@ -2601,7 +2601,7 @@ std::optional<std::pair<uint64_t, uint64_t>> sstable::get_sample_indexes_for_ran
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
future<> remove_by_toc_name(sstring sstable_toc_name, storage::sync_dir sync) {
|
||||
future<sstring> make_toc_temporary(sstring sstable_toc_name, storage::sync_dir sync) {
|
||||
sstring prefix = sstable_toc_name.substr(0, sstable_toc_name.size() - sstable_version_constants::TOC_SUFFIX.size());
|
||||
sstring new_toc_name = prefix + sstable_version_constants::TEMPORARY_TOC_SUFFIX;
|
||||
|
||||
@@ -2615,37 +2615,10 @@ future<> remove_by_toc_name(sstring sstable_toc_name, storage::sync_dir sync) {
|
||||
} else {
|
||||
if (!co_await sstable_io_check(sstable_write_error_handler, file_exists, new_toc_name)) {
|
||||
sstlog.warn("Unable to delete {} because it doesn't exist.", sstable_toc_name);
|
||||
co_return;
|
||||
co_return "";
|
||||
}
|
||||
}
|
||||
auto toc_file = co_await open_checked_file_dma(sstable_write_error_handler, new_toc_name, open_flags::ro);
|
||||
std::vector<sstring> components = co_await with_closeable(std::move(toc_file), [] (file& toc_file) {
|
||||
return sstable::read_and_parse_toc(toc_file);
|
||||
});
|
||||
|
||||
co_await coroutine::parallel_for_each(components, [&prefix] (sstring component) -> future<> {
|
||||
if (component.empty()) {
|
||||
// eof
|
||||
co_return;
|
||||
}
|
||||
if (component == sstable_version_constants::TOC_SUFFIX) {
|
||||
// already renamed
|
||||
co_return;
|
||||
}
|
||||
auto fname = prefix + component;
|
||||
try {
|
||||
co_await sstable_io_check(sstable_write_error_handler, remove_file, fname);
|
||||
} catch (...) {
|
||||
if (!is_system_error_errno(ENOENT)) {
|
||||
throw;
|
||||
}
|
||||
sstlog.debug("Forgiving ENOENT when deleting file {}", fname);
|
||||
}
|
||||
});
|
||||
if (sync) {
|
||||
co_await sstable_io_check(sstable_write_error_handler, sync_directory, parent_path(new_toc_name));
|
||||
}
|
||||
co_await sstable_io_check(sstable_write_error_handler, remove_file, new_toc_name);
|
||||
co_return new_toc_name;
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -1017,9 +1017,10 @@ public:
|
||||
// swallows all errors and just reports them to the log.
|
||||
future<> remove_table_directory_if_has_no_snapshots(fs::path table_dir);
|
||||
|
||||
// similar to sstable::unlink, but works on a TOC file name
|
||||
// Caller may pass sync_dir::no for batching multiple deletes in the same directory,
|
||||
// and make sure the directory is sync'ed on or after the last call.
|
||||
future<> remove_by_toc_name(sstring sstable_toc_name, storage::sync_dir sync = storage::sync_dir::yes);
|
||||
// makes sure the TOC file is temporary by moving existing TOC file or otherwise
|
||||
// checking the temporary-TOC already exists
|
||||
// resolves into temporary-TOC file name or empty string if neither TOC nor temp.
|
||||
// TOC is there
|
||||
future<sstring> make_toc_temporary(sstring sstable_toc_name, storage::sync_dir sync = storage::sync_dir::yes);
|
||||
|
||||
} // namespace sstables
|
||||
|
||||
@@ -399,6 +399,10 @@ future<> filesystem_storage::change_state(const sstable& sst, sstable_state stat
|
||||
co_await move(sst, path.native(), std::move(new_generation), delay_commit);
|
||||
}
|
||||
|
||||
static inline fs::path parent_path(const sstring& fname) {
|
||||
return fs::canonical(fs::path(fname)).parent_path();
|
||||
}
|
||||
|
||||
future<> filesystem_storage::wipe(const sstable& sst, sync_dir sync) noexcept {
|
||||
// We must be able to generate toc_filename()
|
||||
// in order to delete the sstable.
|
||||
@@ -409,7 +413,31 @@ future<> filesystem_storage::wipe(const sstable& sst, sync_dir sync) noexcept {
|
||||
}();
|
||||
|
||||
try {
|
||||
co_await remove_by_toc_name(name, sync);
|
||||
auto new_toc_name = co_await make_toc_temporary(name, sync);
|
||||
if (!new_toc_name.empty()) {
|
||||
auto dir_name = parent_path(new_toc_name);
|
||||
|
||||
co_await coroutine::parallel_for_each(sst.all_components(), [&sst, &dir_name] (auto component) -> future<> {
|
||||
if (component.first == component_type::TOC) {
|
||||
// already renamed
|
||||
co_return;
|
||||
}
|
||||
|
||||
auto fname = sstable::filename(dir_name.native(), sst._schema->ks_name(), sst._schema->cf_name(), sst._version, sst._generation, sst._format, component.second);
|
||||
try {
|
||||
co_await sst.sstable_write_io_check(remove_file, fname);
|
||||
} catch (...) {
|
||||
if (!is_system_error_errno(ENOENT)) {
|
||||
throw;
|
||||
}
|
||||
sstlog.debug("Forgiving ENOENT when deleting file {}", fname);
|
||||
}
|
||||
});
|
||||
if (sync) {
|
||||
co_await sst.sstable_write_io_check(sync_directory, dir_name.native());
|
||||
}
|
||||
co_await sst.sstable_write_io_check(remove_file, new_toc_name);
|
||||
}
|
||||
} catch (...) {
|
||||
// Log and ignore the failure since there is nothing much we can do about it at this point.
|
||||
// a. Compaction will retry deleting the sstable in the next pass, and
|
||||
|
||||
@@ -5200,7 +5200,8 @@ SEASTAR_TEST_CASE(cleanup_during_offstrategy_incremental_compaction_test) {
|
||||
sstables_closed++;
|
||||
}));
|
||||
observers.push_back(sst->add_on_delete_handler([&] (sstable& sst) mutable {
|
||||
auto missing = !file_exists(sst.get_filename()).get();
|
||||
// ATTN -- the _on_delete callback is not necessarily running in thread
|
||||
auto missing = (::access(sst.get_filename().c_str(), F_OK) != 0);
|
||||
testlog.info("Deleting sstable of generation {}: missing={}", sst.generation(), missing);
|
||||
sstables_missing_on_delete += missing;
|
||||
}));
|
||||
|
||||
@@ -778,3 +778,53 @@ SEASTAR_THREAD_TEST_CASE(test_system_datadir_layout) {
|
||||
BOOST_REQUIRE(!file_exists(tbl_dirname.native()).get());
|
||||
}, cfg).get();
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_pending_log_garbage_collection) {
|
||||
return sstables::test_env::do_with_sharded_async([] (auto& env) {
|
||||
std::vector<shared_sstable> ssts_to_keep;
|
||||
for (int i = 0; i < 2; i++) {
|
||||
ssts_to_keep.emplace_back(make_sstable_for_this_shard(std::bind(new_env_sstable, std::ref(env.local()))));
|
||||
}
|
||||
std::vector<shared_sstable> ssts_to_remove;
|
||||
for (int i = 0; i < 3; i++) {
|
||||
ssts_to_remove.emplace_back(make_sstable_for_this_shard(std::bind(new_env_sstable, std::ref(env.local()))));
|
||||
}
|
||||
|
||||
// Now start atomic deletion -- create the pending deletion log for all
|
||||
// three sstables, move TOC file for one of them into temporary-TOC, and
|
||||
// partially delete another
|
||||
sstable_directory::create_pending_deletion_log(ssts_to_remove).get0();
|
||||
rename_file(test(ssts_to_remove[1]).filename(sstables::component_type::TOC).native(), test(ssts_to_remove[1]).filename(sstables::component_type::TemporaryTOC).native()).get();
|
||||
rename_file(test(ssts_to_remove[2]).filename(sstables::component_type::TOC).native(), test(ssts_to_remove[2]).filename(sstables::component_type::TemporaryTOC).native()).get();
|
||||
remove_file(test(ssts_to_remove[2]).filename(sstables::component_type::Data).native()).get();
|
||||
|
||||
with_sstable_directory(env, [&] (sharded<sstables::sstable_directory>& sstdir) {
|
||||
auto expect_ok = distributed_loader_for_tests::process_sstable_dir(sstdir, { .throw_on_missing_toc = true, .garbage_collect = true });
|
||||
BOOST_REQUIRE_NO_THROW(expect_ok.get());
|
||||
|
||||
auto collected = sstdir.map_reduce0(
|
||||
[] (auto& sstdir) {
|
||||
return do_with(std::set<sstables::generation_type>(), [&sstdir] (auto& gens) {
|
||||
return sstdir.do_for_each_sstable([&] (const shared_sstable& sst) {
|
||||
gens.emplace(sst->generation());
|
||||
return make_ready_future<>();
|
||||
}).then([&gens] () mutable -> future<std::set<sstables::generation_type>> {
|
||||
return make_ready_future<std::set<sstables::generation_type>>(std::move(gens));;
|
||||
});
|
||||
});
|
||||
}, std::set<sstables::generation_type>(),
|
||||
[] (auto&& res, auto&& gens) {
|
||||
res.merge(gens);
|
||||
return std::move(res);
|
||||
}
|
||||
).get();
|
||||
|
||||
std::set<sstables::generation_type> expected;
|
||||
for (auto& sst : ssts_to_keep) {
|
||||
expected.insert(sst->generation());
|
||||
}
|
||||
|
||||
BOOST_REQUIRE_EQUAL(expected, collected);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user