From 2c9ec6bc93d9dec178449ef45bbbb7aae0b37467 Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Thu, 7 Sep 2023 16:37:45 +0300 Subject: [PATCH] sstable_directory: Garbage collect S3 sstables on reboot When booting there can be dangling entries in sstables registry as well as objects on the storage itself. This patch makes the S3 lister list those entries and then kick the s3_storage to remove the corresponding objects. At the end the dangling entries are removed from the registry Signed-off-by: Pavel Emelyanov --- sstables/sstable_directory.cc | 17 +++++++++++++++-- sstables/storage.cc | 32 ++++++++++++++++++++++++++++++++ sstables/storage.hh | 2 ++ 3 files changed, 49 insertions(+), 2 deletions(-) diff --git a/sstables/sstable_directory.cc b/sstables/sstable_directory.cc index ccd3ded4a5..4334becb5a 100644 --- a/sstables/sstable_directory.cc +++ b/sstables/sstable_directory.cc @@ -345,8 +345,21 @@ future<> sstable_directory::system_keyspace_components_lister::commit() { } future<> sstable_directory::system_keyspace_components_lister::garbage_collect(storage& st) { - // FIXME -- implement - co_return; + return do_with(std::set(), [this, &st] (auto& gens_to_remove) { + return _sys_ks.sstables_registry_list(_location, [&st, &gens_to_remove] (utils::UUID uuid, sstring status, entry_descriptor desc) { + if (status == "sealed") { + return make_ready_future<>(); + } + + dirlog.info("Removing dangling {} {} entry", status, uuid); + gens_to_remove.insert(desc.generation); + return st.remove_by_registry_entry(uuid, std::move(desc)); + }).then([this, &gens_to_remove] { + return parallel_for_each(gens_to_remove, [this] (auto gen) { + return _sys_ks.sstables_registry_delete_entry(_location, gen); + }); + }); + }); } future<> diff --git a/sstables/storage.cc b/sstables/storage.cc index 54f22ef1ac..7e92e35778 100644 --- a/sstables/storage.cc +++ b/sstables/storage.cc @@ -14,6 +14,7 @@ #include #include #include +#include #include #include "sstables/exceptions.hh" @@ -72,6 +73,7 @@ public: virtual noncopyable_function(std::vector)> atomic_deleter() const override { return sstable_directory::delete_with_pending_deletion_log; } + virtual future<> remove_by_registry_entry(utils::UUID uuid, entry_descriptor desc) override; virtual sstring prefix() const override { return _dir; } }; @@ -425,6 +427,10 @@ future<> filesystem_storage::wipe(const sstable& sst, sync_dir sync) noexcept { } } +future<> filesystem_storage::remove_by_registry_entry(utils::UUID uuid, entry_descriptor desc) { + on_internal_error(sstlog, "Filesystem storage doesn't keep its entries in registry"); +} + class s3_storage : public sstables::storage { shared_ptr _client; sstring _bucket; @@ -464,6 +470,7 @@ public: virtual noncopyable_function(std::vector)> atomic_deleter() const override { return delete_with_system_keyspace; } + virtual future<> remove_by_registry_entry(utils::UUID uuid, entry_descriptor desc) override; virtual sstring prefix() const override { return _location; } }; @@ -538,6 +545,31 @@ future<> s3_storage::wipe(const sstable& sst, sync_dir) noexcept { co_await sys_ks.sstables_registry_delete_entry(_location, sst.generation()); } +future<> s3_storage::remove_by_registry_entry(utils::UUID uuid, entry_descriptor desc) { + auto prefix = format("/{}/{}", _bucket, uuid); + std::optional> toc; + std::vector components; + + try { + toc = co_await _client->get_object_contiguous(prefix + "/" + sstable_version_constants::get_component_map(desc.version).at(component_type::TOC)); + } catch (seastar::httpd::unexpected_status_error& e) { + if (e.status() != seastar::http::reply::status_type::no_content) { + throw; + } + } + if (!toc) { + co_return; // missing TOC object is OK + } + + boost::split(components, std::string_view(toc->get(), toc->size()), boost::is_any_of("\n")); + co_await coroutine::parallel_for_each(components, [this, &prefix] (sstring comp) -> future<> { + if (comp != sstable_version_constants::TOC_SUFFIX) { + co_await _client->delete_object(prefix + "/" + comp); + } + }); + co_await _client->delete_object(prefix + "/" + sstable_version_constants::TOC_SUFFIX); +} + future<> s3_storage::delete_with_system_keyspace(std::vector ssts) { co_await coroutine::parallel_for_each(ssts, [] (shared_sstable sst) -> future<> { const s3_storage* storage = dynamic_cast(&sst->get_storage()); diff --git a/sstables/storage.hh b/sstables/storage.hh index 5e3c811f5f..74e0d3b7d1 100644 --- a/sstables/storage.hh +++ b/sstables/storage.hh @@ -30,6 +30,7 @@ enum class sstable_state; class delayed_commit_changes; class sstable; class sstables_manager; +class entry_descriptor; class storage { friend class test; @@ -62,6 +63,7 @@ public: virtual future make_component_sink(sstable& sst, component_type type, open_flags oflags, file_output_stream_options options) = 0; virtual future<> destroy(const sstable& sst) = 0; virtual noncopyable_function(std::vector)> atomic_deleter() const = 0; + virtual future<> remove_by_registry_entry(utils::UUID uuid, entry_descriptor desc) = 0; virtual sstring prefix() const = 0; };