sstable_directory: Garbage collect S3 sstables on reboot
When booting there can be dangling entries in sstables registry as well as objects on the storage itself. This patch makes the S3 lister list those entries and then kick the s3_storage to remove the corresponding objects. At the end the dangling entries are removed from the registry Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
This commit is contained in:
@@ -345,8 +345,21 @@ future<> sstable_directory::system_keyspace_components_lister::commit() {
|
||||
}
|
||||
|
||||
future<> sstable_directory::system_keyspace_components_lister::garbage_collect(storage& st) {
|
||||
// FIXME -- implement
|
||||
co_return;
|
||||
return do_with(std::set<generation_type>(), [this, &st] (auto& gens_to_remove) {
|
||||
return _sys_ks.sstables_registry_list(_location, [&st, &gens_to_remove] (utils::UUID uuid, sstring status, entry_descriptor desc) {
|
||||
if (status == "sealed") {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
dirlog.info("Removing dangling {} {} entry", status, uuid);
|
||||
gens_to_remove.insert(desc.generation);
|
||||
return st.remove_by_registry_entry(uuid, std::move(desc));
|
||||
}).then([this, &gens_to_remove] {
|
||||
return parallel_for_each(gens_to_remove, [this] (auto gen) {
|
||||
return _sys_ks.sstables_registry_delete_entry(_location, gen);
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
future<>
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
#include <exception>
|
||||
#include <seastar/coroutine/exception.hh>
|
||||
#include <seastar/coroutine/parallel_for_each.hh>
|
||||
#include <seastar/http/exception.hh>
|
||||
#include <seastar/util/file.hh>
|
||||
|
||||
#include "sstables/exceptions.hh"
|
||||
@@ -72,6 +73,7 @@ public:
|
||||
virtual noncopyable_function<future<>(std::vector<shared_sstable>)> atomic_deleter() const override {
|
||||
return sstable_directory::delete_with_pending_deletion_log;
|
||||
}
|
||||
virtual future<> remove_by_registry_entry(utils::UUID uuid, entry_descriptor desc) override;
|
||||
|
||||
virtual sstring prefix() const override { return _dir; }
|
||||
};
|
||||
@@ -425,6 +427,10 @@ future<> filesystem_storage::wipe(const sstable& sst, sync_dir sync) noexcept {
|
||||
}
|
||||
}
|
||||
|
||||
future<> filesystem_storage::remove_by_registry_entry(utils::UUID uuid, entry_descriptor desc) {
|
||||
on_internal_error(sstlog, "Filesystem storage doesn't keep its entries in registry");
|
||||
}
|
||||
|
||||
class s3_storage : public sstables::storage {
|
||||
shared_ptr<s3::client> _client;
|
||||
sstring _bucket;
|
||||
@@ -464,6 +470,7 @@ public:
|
||||
virtual noncopyable_function<future<>(std::vector<shared_sstable>)> atomic_deleter() const override {
|
||||
return delete_with_system_keyspace;
|
||||
}
|
||||
virtual future<> remove_by_registry_entry(utils::UUID uuid, entry_descriptor desc) override;
|
||||
|
||||
virtual sstring prefix() const override { return _location; }
|
||||
};
|
||||
@@ -538,6 +545,31 @@ future<> s3_storage::wipe(const sstable& sst, sync_dir) noexcept {
|
||||
co_await sys_ks.sstables_registry_delete_entry(_location, sst.generation());
|
||||
}
|
||||
|
||||
future<> s3_storage::remove_by_registry_entry(utils::UUID uuid, entry_descriptor desc) {
|
||||
auto prefix = format("/{}/{}", _bucket, uuid);
|
||||
std::optional<temporary_buffer<char>> toc;
|
||||
std::vector<sstring> components;
|
||||
|
||||
try {
|
||||
toc = co_await _client->get_object_contiguous(prefix + "/" + sstable_version_constants::get_component_map(desc.version).at(component_type::TOC));
|
||||
} catch (seastar::httpd::unexpected_status_error& e) {
|
||||
if (e.status() != seastar::http::reply::status_type::no_content) {
|
||||
throw;
|
||||
}
|
||||
}
|
||||
if (!toc) {
|
||||
co_return; // missing TOC object is OK
|
||||
}
|
||||
|
||||
boost::split(components, std::string_view(toc->get(), toc->size()), boost::is_any_of("\n"));
|
||||
co_await coroutine::parallel_for_each(components, [this, &prefix] (sstring comp) -> future<> {
|
||||
if (comp != sstable_version_constants::TOC_SUFFIX) {
|
||||
co_await _client->delete_object(prefix + "/" + comp);
|
||||
}
|
||||
});
|
||||
co_await _client->delete_object(prefix + "/" + sstable_version_constants::TOC_SUFFIX);
|
||||
}
|
||||
|
||||
future<> s3_storage::delete_with_system_keyspace(std::vector<shared_sstable> ssts) {
|
||||
co_await coroutine::parallel_for_each(ssts, [] (shared_sstable sst) -> future<> {
|
||||
const s3_storage* storage = dynamic_cast<const s3_storage*>(&sst->get_storage());
|
||||
|
||||
@@ -30,6 +30,7 @@ enum class sstable_state;
|
||||
class delayed_commit_changes;
|
||||
class sstable;
|
||||
class sstables_manager;
|
||||
class entry_descriptor;
|
||||
|
||||
class storage {
|
||||
friend class test;
|
||||
@@ -62,6 +63,7 @@ public:
|
||||
virtual future<data_sink> make_component_sink(sstable& sst, component_type type, open_flags oflags, file_output_stream_options options) = 0;
|
||||
virtual future<> destroy(const sstable& sst) = 0;
|
||||
virtual noncopyable_function<future<>(std::vector<shared_sstable>)> atomic_deleter() const = 0;
|
||||
virtual future<> remove_by_registry_entry(utils::UUID uuid, entry_descriptor desc) = 0;
|
||||
|
||||
virtual sstring prefix() const = 0;
|
||||
};
|
||||
|
||||
Reference in New Issue
Block a user