diff --git a/sstables/sstable_directory.cc b/sstables/sstable_directory.cc index ccd3ded4a5..4334becb5a 100644 --- a/sstables/sstable_directory.cc +++ b/sstables/sstable_directory.cc @@ -345,8 +345,21 @@ future<> sstable_directory::system_keyspace_components_lister::commit() { } future<> sstable_directory::system_keyspace_components_lister::garbage_collect(storage& st) { - // FIXME -- implement - co_return; + return do_with(std::set(), [this, &st] (auto& gens_to_remove) { + return _sys_ks.sstables_registry_list(_location, [&st, &gens_to_remove] (utils::UUID uuid, sstring status, entry_descriptor desc) { + if (status == "sealed") { + return make_ready_future<>(); + } + + dirlog.info("Removing dangling {} {} entry", status, uuid); + gens_to_remove.insert(desc.generation); + return st.remove_by_registry_entry(uuid, std::move(desc)); + }).then([this, &gens_to_remove] { + return parallel_for_each(gens_to_remove, [this] (auto gen) { + return _sys_ks.sstables_registry_delete_entry(_location, gen); + }); + }); + }); } future<> diff --git a/sstables/storage.cc b/sstables/storage.cc index 54f22ef1ac..7e92e35778 100644 --- a/sstables/storage.cc +++ b/sstables/storage.cc @@ -14,6 +14,7 @@ #include #include #include +#include #include #include "sstables/exceptions.hh" @@ -72,6 +73,7 @@ public: virtual noncopyable_function(std::vector)> atomic_deleter() const override { return sstable_directory::delete_with_pending_deletion_log; } + virtual future<> remove_by_registry_entry(utils::UUID uuid, entry_descriptor desc) override; virtual sstring prefix() const override { return _dir; } }; @@ -425,6 +427,10 @@ future<> filesystem_storage::wipe(const sstable& sst, sync_dir sync) noexcept { } } +future<> filesystem_storage::remove_by_registry_entry(utils::UUID uuid, entry_descriptor desc) { + on_internal_error(sstlog, "Filesystem storage doesn't keep its entries in registry"); +} + class s3_storage : public sstables::storage { shared_ptr _client; sstring _bucket; @@ -464,6 +470,7 @@ public: virtual noncopyable_function(std::vector)> atomic_deleter() const override { return delete_with_system_keyspace; } + virtual future<> remove_by_registry_entry(utils::UUID uuid, entry_descriptor desc) override; virtual sstring prefix() const override { return _location; } }; @@ -538,6 +545,31 @@ future<> s3_storage::wipe(const sstable& sst, sync_dir) noexcept { co_await sys_ks.sstables_registry_delete_entry(_location, sst.generation()); } +future<> s3_storage::remove_by_registry_entry(utils::UUID uuid, entry_descriptor desc) { + auto prefix = format("/{}/{}", _bucket, uuid); + std::optional> toc; + std::vector components; + + try { + toc = co_await _client->get_object_contiguous(prefix + "/" + sstable_version_constants::get_component_map(desc.version).at(component_type::TOC)); + } catch (seastar::httpd::unexpected_status_error& e) { + if (e.status() != seastar::http::reply::status_type::no_content) { + throw; + } + } + if (!toc) { + co_return; // missing TOC object is OK + } + + boost::split(components, std::string_view(toc->get(), toc->size()), boost::is_any_of("\n")); + co_await coroutine::parallel_for_each(components, [this, &prefix] (sstring comp) -> future<> { + if (comp != sstable_version_constants::TOC_SUFFIX) { + co_await _client->delete_object(prefix + "/" + comp); + } + }); + co_await _client->delete_object(prefix + "/" + sstable_version_constants::TOC_SUFFIX); +} + future<> s3_storage::delete_with_system_keyspace(std::vector ssts) { co_await coroutine::parallel_for_each(ssts, [] (shared_sstable sst) -> future<> { const s3_storage* storage = dynamic_cast(&sst->get_storage()); diff --git a/sstables/storage.hh b/sstables/storage.hh index 5e3c811f5f..74e0d3b7d1 100644 --- a/sstables/storage.hh +++ b/sstables/storage.hh @@ -30,6 +30,7 @@ enum class sstable_state; class delayed_commit_changes; class sstable; class sstables_manager; +class entry_descriptor; class storage { friend class test; @@ -62,6 +63,7 @@ public: virtual future make_component_sink(sstable& sst, component_type type, open_flags oflags, file_output_stream_options options) = 0; virtual future<> destroy(const sstable& sst) = 0; virtual noncopyable_function(std::vector)> atomic_deleter() const = 0; + virtual future<> remove_by_registry_entry(utils::UUID uuid, entry_descriptor desc) = 0; virtual sstring prefix() const = 0; };