diff --git a/sstables/sstable_directory.cc b/sstables/sstable_directory.cc index 3969eb0df4..44efcd4082 100644 --- a/sstables/sstable_directory.cc +++ b/sstables/sstable_directory.cc @@ -545,8 +545,8 @@ bool sstable_directory::compare_sstable_storage_prefix(const sstring& prefix_a, return size_a == size_b && sstring::traits_type::compare(prefix_a.begin(), prefix_b.begin(), size_a) == 0; } -future<> sstable_directory::delete_with_pending_deletion_log(std::vector ssts) { - return seastar::async([ssts = std::move(ssts)] { +future> sstable_directory::create_pending_deletion_log(const std::vector& ssts) { + return seastar::async([&ssts] { shared_sstable first = nullptr; min_max_tracker gen_tracker; @@ -590,35 +590,41 @@ future<> sstable_directory::delete_with_pending_deletion_log(std::vectorunlink(sstables::storage::sync_dir::no); - }).get(); - - sync_directory(first->_storage->prefix()).get(); - - // Once all sstables are deleted, the log file can be removed. - // Note: the log file will be removed also if unlink failed to remove - // any sstable and ignored the error. - try { - remove_file(pending_delete_log).get(); - sstlog.debug("{} removed.", pending_delete_log); - } catch (...) { - sstlog.warn("Error removing {}: {}. Ignoring.", pending_delete_log, std::current_exception()); - } + return std::make_pair(std::move(pending_delete_log), first->_storage->prefix()); }); } +future<> sstable_directory::delete_with_pending_deletion_log(std::vector ssts) { + auto [ pending_delete_log, sst_directory] = co_await create_pending_deletion_log(ssts); + + co_await coroutine::parallel_for_each(ssts, [] (shared_sstable sst) { + return sst->unlink(sstables::storage::sync_dir::no); + }); + co_await sync_directory(sst_directory); + + // Once all sstables are deleted, the log file can be removed. + // Note: the log file will be removed also if unlink failed to remove + // any sstable and ignored the error. + try { + co_await remove_file(pending_delete_log); + sstlog.debug("{} removed.", pending_delete_log); + } catch (...) { + sstlog.warn("Error removing {}: {}. Ignoring.", pending_delete_log, std::current_exception()); + } +} + // FIXME: Go through maybe_delete_large_partitions_entry on recovery since // this is an indication we crashed in the middle of delete_with_pending_deletion_log future<> sstable_directory::filesystem_components_lister::replay_pending_delete_log(fs::path pending_delete_log) { @@ -633,7 +639,8 @@ future<> sstable_directory::filesystem_components_lister::replay_pending_delete_ boost::split(basenames, all, boost::is_any_of("\n"), boost::token_compress_on); auto tocs = boost::copy_range>(basenames | boost::adaptors::filtered([] (auto&& basename) { return !basename.empty(); })); co_await parallel_for_each(tocs, [&sstdir] (const sstring& name) { - return remove_by_toc_name(sstdir + "/" + name); + // Only move TOC to TOC.tmp, the rest will be finished by regular process + return make_toc_temporary(sstdir + "/" + name).discard_result(); }); sstlog.debug("Replayed {}, removing", pending_delete_log); co_await remove_file(pending_delete_log.native()); diff --git a/sstables/sstable_directory.hh b/sstables/sstable_directory.hh index 6613c18683..18b6d1df42 100644 --- a/sstables/sstable_directory.hh +++ b/sstables/sstable_directory.hh @@ -277,6 +277,10 @@ public: // // This function only solves the second problem for now. static future<> delete_with_pending_deletion_log(std::vector ssts); + // Creates the deletion log for atomic deletion of sstables (helper for the + // above function that's also used by tests) + // Returns a pair of "logilfe name" and "directory with sstables" + static future> create_pending_deletion_log(const std::vector& ssts); static bool compare_sstable_storage_prefix(const sstring& a, const sstring& b) noexcept; }; diff --git a/sstables/sstables.cc b/sstables/sstables.cc index 49dbe33d91..d02f89e183 100644 --- a/sstables/sstables.cc +++ b/sstables/sstables.cc @@ -2601,7 +2601,7 @@ std::optional> sstable::get_sample_indexes_for_ran return std::nullopt; } -future<> remove_by_toc_name(sstring sstable_toc_name, storage::sync_dir sync) { +future make_toc_temporary(sstring sstable_toc_name, storage::sync_dir sync) { sstring prefix = sstable_toc_name.substr(0, sstable_toc_name.size() - sstable_version_constants::TOC_SUFFIX.size()); sstring new_toc_name = prefix + sstable_version_constants::TEMPORARY_TOC_SUFFIX; @@ -2615,37 +2615,10 @@ future<> remove_by_toc_name(sstring sstable_toc_name, storage::sync_dir sync) { } else { if (!co_await sstable_io_check(sstable_write_error_handler, file_exists, new_toc_name)) { sstlog.warn("Unable to delete {} because it doesn't exist.", sstable_toc_name); - co_return; + co_return ""; } } - auto toc_file = co_await open_checked_file_dma(sstable_write_error_handler, new_toc_name, open_flags::ro); - std::vector components = co_await with_closeable(std::move(toc_file), [] (file& toc_file) { - return sstable::read_and_parse_toc(toc_file); - }); - - co_await coroutine::parallel_for_each(components, [&prefix] (sstring component) -> future<> { - if (component.empty()) { - // eof - co_return; - } - if (component == sstable_version_constants::TOC_SUFFIX) { - // already renamed - co_return; - } - auto fname = prefix + component; - try { - co_await sstable_io_check(sstable_write_error_handler, remove_file, fname); - } catch (...) { - if (!is_system_error_errno(ENOENT)) { - throw; - } - sstlog.debug("Forgiving ENOENT when deleting file {}", fname); - } - }); - if (sync) { - co_await sstable_io_check(sstable_write_error_handler, sync_directory, parent_path(new_toc_name)); - } - co_await sstable_io_check(sstable_write_error_handler, remove_file, new_toc_name); + co_return new_toc_name; } /** diff --git a/sstables/sstables.hh b/sstables/sstables.hh index 528eaa4602..7a0aa9c388 100644 --- a/sstables/sstables.hh +++ b/sstables/sstables.hh @@ -1017,9 +1017,10 @@ public: // swallows all errors and just reports them to the log. future<> remove_table_directory_if_has_no_snapshots(fs::path table_dir); -// similar to sstable::unlink, but works on a TOC file name -// Caller may pass sync_dir::no for batching multiple deletes in the same directory, -// and make sure the directory is sync'ed on or after the last call. -future<> remove_by_toc_name(sstring sstable_toc_name, storage::sync_dir sync = storage::sync_dir::yes); +// makes sure the TOC file is temporary by moving existing TOC file or otherwise +// checking the temporary-TOC already exists +// resolves into temporary-TOC file name or empty string if neither TOC nor temp. +// TOC is there +future make_toc_temporary(sstring sstable_toc_name, storage::sync_dir sync = storage::sync_dir::yes); } // namespace sstables diff --git a/sstables/storage.cc b/sstables/storage.cc index ad86c5c05d..d19ba7d4e2 100644 --- a/sstables/storage.cc +++ b/sstables/storage.cc @@ -399,6 +399,10 @@ future<> filesystem_storage::change_state(const sstable& sst, sstable_state stat co_await move(sst, path.native(), std::move(new_generation), delay_commit); } +static inline fs::path parent_path(const sstring& fname) { + return fs::canonical(fs::path(fname)).parent_path(); +} + future<> filesystem_storage::wipe(const sstable& sst, sync_dir sync) noexcept { // We must be able to generate toc_filename() // in order to delete the sstable. @@ -409,7 +413,31 @@ future<> filesystem_storage::wipe(const sstable& sst, sync_dir sync) noexcept { }(); try { - co_await remove_by_toc_name(name, sync); + auto new_toc_name = co_await make_toc_temporary(name, sync); + if (!new_toc_name.empty()) { + auto dir_name = parent_path(new_toc_name); + + co_await coroutine::parallel_for_each(sst.all_components(), [&sst, &dir_name] (auto component) -> future<> { + if (component.first == component_type::TOC) { + // already renamed + co_return; + } + + auto fname = sstable::filename(dir_name.native(), sst._schema->ks_name(), sst._schema->cf_name(), sst._version, sst._generation, sst._format, component.second); + try { + co_await sst.sstable_write_io_check(remove_file, fname); + } catch (...) { + if (!is_system_error_errno(ENOENT)) { + throw; + } + sstlog.debug("Forgiving ENOENT when deleting file {}", fname); + } + }); + if (sync) { + co_await sst.sstable_write_io_check(sync_directory, dir_name.native()); + } + co_await sst.sstable_write_io_check(remove_file, new_toc_name); + } } catch (...) { // Log and ignore the failure since there is nothing much we can do about it at this point. // a. Compaction will retry deleting the sstable in the next pass, and diff --git a/test/boost/sstable_compaction_test.cc b/test/boost/sstable_compaction_test.cc index 3cbc6ff126..a225e4578b 100644 --- a/test/boost/sstable_compaction_test.cc +++ b/test/boost/sstable_compaction_test.cc @@ -5200,7 +5200,8 @@ SEASTAR_TEST_CASE(cleanup_during_offstrategy_incremental_compaction_test) { sstables_closed++; })); observers.push_back(sst->add_on_delete_handler([&] (sstable& sst) mutable { - auto missing = !file_exists(sst.get_filename()).get(); + // ATTN -- the _on_delete callback is not necessarily running in thread + auto missing = (::access(sst.get_filename().c_str(), F_OK) != 0); testlog.info("Deleting sstable of generation {}: missing={}", sst.generation(), missing); sstables_missing_on_delete += missing; })); diff --git a/test/boost/sstable_directory_test.cc b/test/boost/sstable_directory_test.cc index 1ebdaebb3c..d043301577 100644 --- a/test/boost/sstable_directory_test.cc +++ b/test/boost/sstable_directory_test.cc @@ -778,3 +778,53 @@ SEASTAR_THREAD_TEST_CASE(test_system_datadir_layout) { BOOST_REQUIRE(!file_exists(tbl_dirname.native()).get()); }, cfg).get(); } + +SEASTAR_TEST_CASE(test_pending_log_garbage_collection) { + return sstables::test_env::do_with_sharded_async([] (auto& env) { + std::vector ssts_to_keep; + for (int i = 0; i < 2; i++) { + ssts_to_keep.emplace_back(make_sstable_for_this_shard(std::bind(new_env_sstable, std::ref(env.local())))); + } + std::vector ssts_to_remove; + for (int i = 0; i < 3; i++) { + ssts_to_remove.emplace_back(make_sstable_for_this_shard(std::bind(new_env_sstable, std::ref(env.local())))); + } + + // Now start atomic deletion -- create the pending deletion log for all + // three sstables, move TOC file for one of them into temporary-TOC, and + // partially delete another + sstable_directory::create_pending_deletion_log(ssts_to_remove).get0(); + rename_file(test(ssts_to_remove[1]).filename(sstables::component_type::TOC).native(), test(ssts_to_remove[1]).filename(sstables::component_type::TemporaryTOC).native()).get(); + rename_file(test(ssts_to_remove[2]).filename(sstables::component_type::TOC).native(), test(ssts_to_remove[2]).filename(sstables::component_type::TemporaryTOC).native()).get(); + remove_file(test(ssts_to_remove[2]).filename(sstables::component_type::Data).native()).get(); + + with_sstable_directory(env, [&] (sharded& sstdir) { + auto expect_ok = distributed_loader_for_tests::process_sstable_dir(sstdir, { .throw_on_missing_toc = true, .garbage_collect = true }); + BOOST_REQUIRE_NO_THROW(expect_ok.get()); + + auto collected = sstdir.map_reduce0( + [] (auto& sstdir) { + return do_with(std::set(), [&sstdir] (auto& gens) { + return sstdir.do_for_each_sstable([&] (const shared_sstable& sst) { + gens.emplace(sst->generation()); + return make_ready_future<>(); + }).then([&gens] () mutable -> future> { + return make_ready_future>(std::move(gens));; + }); + }); + }, std::set(), + [] (auto&& res, auto&& gens) { + res.merge(gens); + return std::move(res); + } + ).get(); + + std::set expected; + for (auto& sst : ssts_to_keep) { + expected.insert(sst->generation()); + } + + BOOST_REQUIRE_EQUAL(expected, collected); + }); + }); +}