sstable_directory: Garbage collect S3 sstables on reboot

When booting there can be dangling entries in sstables registry as well
as objects on the storage itself. This patch makes the S3 lister list
those entries and then kick the s3_storage to remove the corresponding
objects. At the end the dangling entries are removed from the registry

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
This commit is contained in:
Pavel Emelyanov
2023-09-07 16:37:45 +03:00
parent 6cb4e3d05a
commit 2c9ec6bc93
3 changed files with 49 additions and 2 deletions

View File

@@ -345,8 +345,21 @@ future<> sstable_directory::system_keyspace_components_lister::commit() {
}
future<> sstable_directory::system_keyspace_components_lister::garbage_collect(storage& st) {
// FIXME -- implement
co_return;
return do_with(std::set<generation_type>(), [this, &st] (auto& gens_to_remove) {
return _sys_ks.sstables_registry_list(_location, [&st, &gens_to_remove] (utils::UUID uuid, sstring status, entry_descriptor desc) {
if (status == "sealed") {
return make_ready_future<>();
}
dirlog.info("Removing dangling {} {} entry", status, uuid);
gens_to_remove.insert(desc.generation);
return st.remove_by_registry_entry(uuid, std::move(desc));
}).then([this, &gens_to_remove] {
return parallel_for_each(gens_to_remove, [this] (auto gen) {
return _sys_ks.sstables_registry_delete_entry(_location, gen);
});
});
});
}
future<>

View File

@@ -14,6 +14,7 @@
#include <exception>
#include <seastar/coroutine/exception.hh>
#include <seastar/coroutine/parallel_for_each.hh>
#include <seastar/http/exception.hh>
#include <seastar/util/file.hh>
#include "sstables/exceptions.hh"
@@ -72,6 +73,7 @@ public:
virtual noncopyable_function<future<>(std::vector<shared_sstable>)> atomic_deleter() const override {
return sstable_directory::delete_with_pending_deletion_log;
}
virtual future<> remove_by_registry_entry(utils::UUID uuid, entry_descriptor desc) override;
virtual sstring prefix() const override { return _dir; }
};
@@ -425,6 +427,10 @@ future<> filesystem_storage::wipe(const sstable& sst, sync_dir sync) noexcept {
}
}
future<> filesystem_storage::remove_by_registry_entry(utils::UUID uuid, entry_descriptor desc) {
on_internal_error(sstlog, "Filesystem storage doesn't keep its entries in registry");
}
class s3_storage : public sstables::storage {
shared_ptr<s3::client> _client;
sstring _bucket;
@@ -464,6 +470,7 @@ public:
virtual noncopyable_function<future<>(std::vector<shared_sstable>)> atomic_deleter() const override {
return delete_with_system_keyspace;
}
virtual future<> remove_by_registry_entry(utils::UUID uuid, entry_descriptor desc) override;
virtual sstring prefix() const override { return _location; }
};
@@ -538,6 +545,31 @@ future<> s3_storage::wipe(const sstable& sst, sync_dir) noexcept {
co_await sys_ks.sstables_registry_delete_entry(_location, sst.generation());
}
future<> s3_storage::remove_by_registry_entry(utils::UUID uuid, entry_descriptor desc) {
auto prefix = format("/{}/{}", _bucket, uuid);
std::optional<temporary_buffer<char>> toc;
std::vector<sstring> components;
try {
toc = co_await _client->get_object_contiguous(prefix + "/" + sstable_version_constants::get_component_map(desc.version).at(component_type::TOC));
} catch (seastar::httpd::unexpected_status_error& e) {
if (e.status() != seastar::http::reply::status_type::no_content) {
throw;
}
}
if (!toc) {
co_return; // missing TOC object is OK
}
boost::split(components, std::string_view(toc->get(), toc->size()), boost::is_any_of("\n"));
co_await coroutine::parallel_for_each(components, [this, &prefix] (sstring comp) -> future<> {
if (comp != sstable_version_constants::TOC_SUFFIX) {
co_await _client->delete_object(prefix + "/" + comp);
}
});
co_await _client->delete_object(prefix + "/" + sstable_version_constants::TOC_SUFFIX);
}
future<> s3_storage::delete_with_system_keyspace(std::vector<shared_sstable> ssts) {
co_await coroutine::parallel_for_each(ssts, [] (shared_sstable sst) -> future<> {
const s3_storage* storage = dynamic_cast<const s3_storage*>(&sst->get_storage());

View File

@@ -30,6 +30,7 @@ enum class sstable_state;
class delayed_commit_changes;
class sstable;
class sstables_manager;
class entry_descriptor;
class storage {
friend class test;
@@ -62,6 +63,7 @@ public:
virtual future<data_sink> make_component_sink(sstable& sst, component_type type, open_flags oflags, file_output_stream_options options) = 0;
virtual future<> destroy(const sstable& sst) = 0;
virtual noncopyable_function<future<>(std::vector<shared_sstable>)> atomic_deleter() const = 0;
virtual future<> remove_by_registry_entry(utils::UUID uuid, entry_descriptor desc) = 0;
virtual sstring prefix() const = 0;
};