Replaces direct s3 interfaces with the abstraction layer, and open for having multiple implentations/backends
764 lines
34 KiB
C++
764 lines
34 KiB
C++
/*
|
|
* Copyright (C) 2020-present ScyllaDB
|
|
*/
|
|
|
|
/*
|
|
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
*/
|
|
|
|
#include <type_traits>
|
|
#include <fmt/ranges.h>
|
|
#include <fmt/std.h>
|
|
#include <seastar/core/coroutine.hh>
|
|
#include <seastar/coroutine/parallel_for_each.hh>
|
|
#include <seastar/util/file.hh>
|
|
#include <seastar/util/lazy.hh>
|
|
#include <boost/algorithm/string.hpp>
|
|
#include "sstables/sstable_directory.hh"
|
|
#include "sstables/sstables.hh"
|
|
#include "sstables/sstables_manager.hh"
|
|
#include "sstables/exceptions.hh"
|
|
#include "compaction/compaction_manager.hh"
|
|
#include "utils/log.hh"
|
|
#include "sstable_directory.hh"
|
|
#include "utils/assert.hh"
|
|
#include "utils/lister.hh"
|
|
#include "utils/overloaded_functor.hh"
|
|
#include "utils/directories.hh"
|
|
#include "utils/s3/client.hh"
|
|
#include "replica/database.hh"
|
|
#include "dht/auto_refreshing_sharder.hh"
|
|
|
|
static logging::logger dirlog("sstable_directory");
|
|
|
|
namespace sstables {
|
|
|
|
bool manifest_json_filter(const fs::path&, const directory_entry& entry) {
|
|
// Filter out directories. If type of the entry is unknown - check its name.
|
|
if (entry.type.value_or(directory_entry_type::regular) != directory_entry_type::directory && (entry.name == "manifest.json" || entry.name == "schema.cql")) {
|
|
return false;
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
sstable_directory::filesystem_components_lister::filesystem_components_lister(std::filesystem::path dir)
|
|
: _directory(dir)
|
|
, _state(std::make_unique<scan_state>())
|
|
{
|
|
}
|
|
|
|
sstable_directory::filesystem_components_lister::filesystem_components_lister(std::filesystem::path dir, sstables_manager& mgr, const data_dictionary::storage_options::object_storage& os)
|
|
: _directory(dir)
|
|
, _state(std::make_unique<scan_state>())
|
|
, _client(mgr.get_endpoint_client(os.endpoint))
|
|
, _bucket(os.bucket)
|
|
{
|
|
}
|
|
|
|
sstable_directory::sstables_registry_components_lister::sstables_registry_components_lister(sstables::sstables_registry& sstables_registry, table_id owner)
|
|
: _sstables_registry(sstables_registry)
|
|
, _owner(std::move(owner))
|
|
{
|
|
}
|
|
|
|
sstable_directory::restore_components_lister::restore_components_lister(const data_dictionary::storage_options::value_type& options,
|
|
std::vector<sstring> toc_filenames)
|
|
: _toc_filenames(std::move(toc_filenames))
|
|
{
|
|
}
|
|
|
|
std::unique_ptr<sstable_directory::components_lister>
|
|
sstable_directory::make_components_lister() {
|
|
return std::visit(overloaded_functor {
|
|
[this] (const data_dictionary::storage_options::local& loc) mutable -> std::unique_ptr<sstable_directory::components_lister> {
|
|
if (loc.dir.empty()) {
|
|
on_internal_error(dirlog, "Local storage options is missing 'dir'");
|
|
}
|
|
return std::make_unique<sstable_directory::filesystem_components_lister>(make_path(loc.dir.native(), _state));
|
|
},
|
|
[this] (const data_dictionary::storage_options::object_storage& os) mutable -> std::unique_ptr<sstable_directory::components_lister> {
|
|
return std::visit(overloaded_functor {
|
|
[this, &os] (const sstring& prefix) -> std::unique_ptr<sstable_directory::components_lister> {
|
|
if (prefix.empty()) {
|
|
on_internal_error(sstlog, fmt::format("{} storage options is missing 'prefix'", os.type));
|
|
}
|
|
return std::make_unique<sstable_directory::filesystem_components_lister>(fs::path(prefix), _manager, os);
|
|
},
|
|
[this, &os] (const table_id& owner) -> std::unique_ptr<sstable_directory::components_lister> {
|
|
if (owner.id.is_null()) {
|
|
on_internal_error(sstlog, fmt::format("{} storage options is missing 'owner'", os.type));
|
|
}
|
|
return std::make_unique<sstable_directory::sstables_registry_components_lister>(_manager.sstables_registry(), owner);
|
|
}
|
|
}, os.location);
|
|
}
|
|
}, _storage_opts->value);
|
|
}
|
|
|
|
sstable_directory::sstable_directory(replica::table& table,
|
|
sstable_state state,
|
|
io_error_handler_gen error_handler_gen)
|
|
: sstable_directory(
|
|
table.get_sstables_manager(),
|
|
table.schema(),
|
|
std::make_unique<dht::auto_refreshing_sharder>(table.shared_from_this()),
|
|
table.get_storage_options_ptr(),
|
|
std::move(state),
|
|
std::move(error_handler_gen)
|
|
)
|
|
{}
|
|
|
|
sstable_directory::sstable_directory(replica::table& table,
|
|
sstable_state state,
|
|
lw_shared_ptr<const data_dictionary::storage_options> storage_opts,
|
|
io_error_handler_gen error_handler_gen)
|
|
: sstable_directory(
|
|
table.get_sstables_manager(),
|
|
table.schema(),
|
|
std::make_unique<dht::auto_refreshing_sharder>(table.shared_from_this()),
|
|
std::move(storage_opts),
|
|
state,
|
|
std::move(error_handler_gen)
|
|
)
|
|
{}
|
|
|
|
sstable_directory::sstable_directory(replica::table& table,
|
|
lw_shared_ptr<const data_dictionary::storage_options> storage_opts,
|
|
std::vector<sstring> sstables,
|
|
io_error_handler_gen error_handler_gen)
|
|
: _manager(table.get_sstables_manager())
|
|
, _schema(table.schema())
|
|
, _storage_opts(std::move(storage_opts))
|
|
, _state(sstable_state::upload)
|
|
, _error_handler_gen(error_handler_gen)
|
|
, _storage(make_storage(_manager, *_storage_opts, _state))
|
|
, _lister(std::make_unique<sstable_directory::restore_components_lister>(_storage_opts->value,
|
|
std::move(sstables)))
|
|
, _sharder_ptr(std::make_unique<dht::auto_refreshing_sharder>(table.shared_from_this()))
|
|
, _sharder(*_sharder_ptr)
|
|
, _unshared_remote_sstables(smp::count)
|
|
{}
|
|
|
|
sstable_directory::sstable_directory(sstables_manager& manager,
|
|
schema_ptr schema,
|
|
const dht::sharder& sharder,
|
|
sstring table_dir,
|
|
sstable_state state,
|
|
io_error_handler_gen error_handler_gen)
|
|
: sstable_directory(
|
|
manager,
|
|
std::move(schema),
|
|
&sharder,
|
|
make_lw_shared<const data_dictionary::storage_options>(data_dictionary::make_local_options(fs::path(table_dir))),
|
|
state,
|
|
std::move(error_handler_gen)
|
|
)
|
|
{}
|
|
|
|
using unique_sharder_ptr = std::unique_ptr<dht::sharder>;
|
|
|
|
sstable_directory::sstable_directory(sstables_manager& manager,
|
|
schema_ptr schema,
|
|
std::variant<unique_sharder_ptr, const dht::sharder*> sharder,
|
|
lw_shared_ptr<const data_dictionary::storage_options> storage_opts,
|
|
sstable_state state,
|
|
io_error_handler_gen error_handler_gen)
|
|
: _manager(manager)
|
|
, _schema(std::move(schema))
|
|
, _storage_opts(std::move(storage_opts))
|
|
, _state(state)
|
|
, _error_handler_gen(error_handler_gen)
|
|
, _storage(make_storage(_manager, *_storage_opts, _state))
|
|
, _lister(make_components_lister())
|
|
, _sharder_ptr(std::holds_alternative<unique_sharder_ptr>(sharder) ? std::move(std::get<unique_sharder_ptr>(sharder)) : nullptr)
|
|
, _sharder(_sharder_ptr ? *_sharder_ptr : *std::get<const dht::sharder*>(sharder))
|
|
, _unshared_remote_sstables(smp::count)
|
|
{}
|
|
|
|
void sstable_directory::filesystem_components_lister::handle(sstables::entry_descriptor desc, fs::path filename) {
|
|
// TODO: decorate sstable_directory with some noncopyable_function<shard_id (generation_type)>
|
|
// to communicate how different tables place sstables into shards.
|
|
if (!sstables::sstable_generation_generator::maybe_owned_by_this_shard(desc.generation)) {
|
|
return;
|
|
}
|
|
|
|
dirlog.trace("for SSTable directory, scanning {}", filename);
|
|
auto generations_found_it = _state->generations_found.emplace(desc.generation, filename);
|
|
|
|
switch (desc.component) {
|
|
case component_type::TemporaryStatistics:
|
|
// We generate TemporaryStatistics when we rewrite the Statistics file,
|
|
// for instance on mutate_level. We should delete it - so we mark it for deletion
|
|
// here, but just the component. The old statistics file should still be there
|
|
// and we'll go with it.
|
|
_state->files_for_removal.insert(filename.native());
|
|
break;
|
|
case component_type::TemporaryHashes:
|
|
// We generate TemporaryHashes when writing the sstable,
|
|
// and it's removed before the sstable is sealed.
|
|
// If it's present, then it's a leftover from a partially-written sstable,
|
|
// and should be removed.
|
|
// This file isn't included in the TOC, so we can't remove on the "usual"
|
|
// mechanism for partially-written components, and instead we have to explicitly
|
|
// mark it for removal here.
|
|
_state->generations_found.erase(generations_found_it);
|
|
_state->files_for_removal.insert(filename.native());
|
|
break;
|
|
case component_type::TOC:
|
|
_state->descriptors.emplace(desc.generation, std::move(desc));
|
|
break;
|
|
case component_type::TemporaryTOC:
|
|
_state->temp_toc_found.push_back(std::move(desc));
|
|
break;
|
|
default:
|
|
// Do nothing, and will validate when trying to load the file.
|
|
break;
|
|
}
|
|
}
|
|
|
|
void sstable_directory::validate(sstables::shared_sstable sst, process_flags flags) const {
|
|
schema_ptr s = sst->get_schema();
|
|
if (s->is_counter() && !sst->has_scylla_component()) {
|
|
sstring error = "Direct loading non-Scylla SSTables containing counters is not supported.";
|
|
if (flags.enable_dangerous_direct_import_of_cassandra_counters) {
|
|
dirlog.info("{} But trying to continue on user's request.", error);
|
|
} else {
|
|
dirlog.error("{} Use sstableloader instead.", error);
|
|
throw std::runtime_error(fmt::format("{} Use sstableloader instead.", error));
|
|
}
|
|
}
|
|
if (s->is_view() && !flags.allow_loading_materialized_view) {
|
|
throw std::runtime_error("Loading Materialized View SSTables is not supported. Re-create the view instead.");
|
|
}
|
|
if (!sst->is_uploaded()) {
|
|
sst->validate_originating_host_id();
|
|
}
|
|
}
|
|
|
|
future<sstables::shared_sstable> sstable_directory::load_sstable(sstables::entry_descriptor desc,
|
|
const data_dictionary::storage_options& storage_opts, sstables::sstable_open_config cfg) const {
|
|
shared_sstable sst = _manager.make_sstable(_schema, storage_opts, desc.generation, _state, desc.version, desc.format, db_clock::now(), _error_handler_gen);
|
|
co_await sst->load(_sharder, cfg);
|
|
co_return sst;
|
|
}
|
|
|
|
future<>
|
|
sstable_directory::process_descriptor(sstables::entry_descriptor desc,
|
|
process_flags flags,
|
|
noncopyable_function<data_dictionary::storage_options()>&& get_storage_options) {
|
|
if (desc.version > _max_version_seen) {
|
|
_max_version_seen = desc.version;
|
|
}
|
|
|
|
auto storage_opts = get_storage_options();
|
|
auto shards = co_await get_shards_for_this_sstable(desc, storage_opts, flags);
|
|
if (flags.sort_sstables_according_to_owner && shards.size() == 1 && shards[0] != this_shard_id()) {
|
|
// identified a remote unshared sstable
|
|
dirlog.trace("{} identified as a remote unshared SSTable, shard={}", seastar::value_of([this, &desc] {
|
|
return sstable::component_basename(_schema->ks_name(), _schema->cf_name(),
|
|
desc.version, desc.generation, desc.format, component_type::Data);
|
|
}), shards[0]);
|
|
_unshared_remote_sstables[shards[0]].push_back(std::move(desc));
|
|
co_return;
|
|
}
|
|
|
|
auto sst = co_await load_sstable(desc, storage_opts, flags.sstable_open_config);
|
|
validate(sst, flags);
|
|
|
|
if (flags.need_mutate_level) {
|
|
dirlog.trace("Mutating {} to level 0\n", sst->get_filename());
|
|
co_await sst->mutate_sstable_level(0);
|
|
}
|
|
|
|
if (flags.sort_sstables_according_to_owner) {
|
|
if (shards.size() == 1) {
|
|
dirlog.trace("{} identified as a local unshared SSTable", sst->get_filename());
|
|
_unshared_local_sstables.push_back(std::move(sst));
|
|
} else {
|
|
dirlog.trace("{} identified as a shared SSTable, shards={}", sst->get_filename(), shards);
|
|
_shared_sstable_info.push_back(co_await sst->get_open_info());
|
|
}
|
|
} else {
|
|
dirlog.debug("Added {} to unsorted sstables list", sst->get_filename());
|
|
_unsorted_sstables.push_back(std::move(sst));
|
|
}
|
|
}
|
|
|
|
sstables::sstable_version_types
|
|
sstable_directory::highest_version_seen() const {
|
|
return _max_version_seen;
|
|
}
|
|
|
|
future<> sstable_directory::prepare(process_flags flags) {
|
|
return _lister->prepare(*this, flags, *_storage);
|
|
}
|
|
|
|
future<> sstable_directory::filesystem_components_lister::prepare(sstable_directory& dir, process_flags flags, storage& st) {
|
|
if (_client) {
|
|
co_return;
|
|
}
|
|
|
|
if (dir._state == sstable_state::quarantine) {
|
|
if (!co_await file_exists(_directory.native())) {
|
|
co_return;
|
|
}
|
|
}
|
|
|
|
// verify owner and mode on the sstables directory
|
|
// and all its subdirectories, except for "snapshots"
|
|
// as there could be a race with scylla-manager that might
|
|
// delete snapshots concurrently
|
|
co_await utils::directories::verify_owner_and_mode(_directory, utils::directories::recursive::no);
|
|
|
|
directory_lister lister(_directory, lister::dir_entry_types::of<directory_entry_type::directory>());
|
|
co_await with_closeable(std::move(lister), [this] (directory_lister& lister) -> future<> {
|
|
while (auto de = co_await lister.get()) {
|
|
if (de->name != sstables::snapshots_dir) {
|
|
co_await utils::directories::verify_owner_and_mode(_directory / de->name, utils::directories::recursive::yes);
|
|
}
|
|
}
|
|
});
|
|
|
|
if (flags.garbage_collect) {
|
|
co_await garbage_collect(st);
|
|
}
|
|
}
|
|
|
|
future<> sstable_directory::sstables_registry_components_lister::prepare(sstable_directory& dir, process_flags flags, storage& st) {
|
|
if (flags.garbage_collect) {
|
|
co_await garbage_collect(st);
|
|
}
|
|
}
|
|
|
|
future<> sstable_directory::restore_components_lister::prepare(sstable_directory& dir, process_flags flags, storage& st) {
|
|
return make_ready_future();
|
|
}
|
|
|
|
future<> sstable_directory::process_sstable_dir(process_flags flags) {
|
|
return _lister->process(*this, flags);
|
|
}
|
|
|
|
future<> sstable_directory::filesystem_components_lister::process(sstable_directory& directory, process_flags flags) {
|
|
if (directory._state == sstable_state::quarantine) {
|
|
if (!co_await file_exists(_directory.native())) {
|
|
co_return;
|
|
}
|
|
}
|
|
|
|
dirlog.debug("Start processing directory {} for SSTables", _directory);
|
|
// It seems wasteful that each shard is repeating this scan, and to some extent it is.
|
|
// However, we still want to open the files and especially call process_dir() in a distributed
|
|
// fashion not to overload any shard. Also in the common case the SSTables will all be
|
|
// unshared and be on the right shard based on their generation number. In light of that there are
|
|
// two advantages of having each shard repeat the directory listing:
|
|
//
|
|
// - The directory listing part already interacts with data_structures inside scan_state. We
|
|
// would have to either transfer a lot of file information among shards or complicate the code
|
|
// to make sure they all update their own version of scan_state and then merge it.
|
|
// - If all shards scan in parallel, they can start loading sooner. That is faster than having
|
|
// a separate step to fetch all files, followed by another step to distribute and process.
|
|
//
|
|
// For bucket lister, slashes are not considered as path components separator, so in order for
|
|
// the lister to list only basename components, append trailing / to prefix name
|
|
|
|
auto lister = !_client ?
|
|
abstract_lister::make<directory_lister>(_directory, lister::dir_entry_types::of<directory_entry_type::regular>(), &manifest_json_filter) :
|
|
_client->make_object_lister(_bucket, _directory.native() + "/", &manifest_json_filter);
|
|
|
|
co_await with_closeable(std::move(lister), coroutine::lambda([this, &directory] (abstract_lister& lister) -> future<> {
|
|
while (auto de = co_await lister.get()) {
|
|
auto component_path = _directory / de->name;
|
|
auto comps = sstables::parse_path(component_path, directory._schema->ks_name(), directory._schema->cf_name());
|
|
handle(std::move(comps), component_path);
|
|
}
|
|
}));
|
|
|
|
// Always okay to delete files with a temporary TOC. We want to do it before we process
|
|
// the generations seen: it's okay to reuse those generations since the files will have
|
|
// been deleted anyway.
|
|
for (auto& desc: _state->temp_toc_found) {
|
|
auto range = _state->generations_found.equal_range(desc.generation);
|
|
for (auto it = range.first; it != range.second; ++it) {
|
|
auto& path = it->second;
|
|
dirlog.trace("Scheduling to remove file {}, from an SSTable with a Temporary TOC", path.native());
|
|
_state->files_for_removal.insert(path.native());
|
|
}
|
|
_state->generations_found.erase(range.first, range.second);
|
|
_state->descriptors.erase(desc.generation);
|
|
}
|
|
|
|
auto msg = format("After {} scanned, {} descriptors found, {} different files found",
|
|
_directory, _state->descriptors.size(), _state->generations_found.size());
|
|
|
|
// _descriptors is everything with a TOC. So after we remove this, what's left is
|
|
// SSTables for which a TOC was not found.
|
|
auto descriptors = std::move(_state->descriptors);
|
|
co_await directory._manager.dir_semaphore().parallel_for_each(descriptors, [this, flags, &directory] (std::pair<const generation_type, sstables::entry_descriptor>& t) {
|
|
auto& desc = std::get<1>(t);
|
|
_state->generations_found.erase(desc.generation);
|
|
// This will try to pre-load this file and throw an exception if it is invalid
|
|
return directory.process_descriptor(std::move(desc), flags,
|
|
[&directory] { return *directory._storage_opts; });
|
|
});
|
|
|
|
// For files missing TOC, it depends on where this is coming from.
|
|
// If scylla was supposed to have generated this SSTable, this is not okay and
|
|
// we refuse to proceed. If this coming from, say, an import, then we just delete,
|
|
// log and proceed.
|
|
for (auto& path : _state->generations_found | std::views::values) {
|
|
if (flags.throw_on_missing_toc) {
|
|
throw sstables::malformed_sstable_exception(seastar::format("At directory: {}: no TOC found for SSTable {}!. Refusing to boot", _directory.native(), path.native()));
|
|
} else {
|
|
dirlog.info("Found incomplete SSTable {} at directory {}. Removing", path.native(), _directory.native());
|
|
_state->files_for_removal.insert(path.native());
|
|
}
|
|
}
|
|
}
|
|
|
|
future<> sstable_directory::sstables_registry_components_lister::process(sstable_directory& directory, process_flags flags) {
|
|
dirlog.debug("Start processing registry entry {} (state {})", _owner, directory._state);
|
|
return _sstables_registry.sstables_registry_list(_owner, [this, flags, &directory] (sstring status, sstable_state state, entry_descriptor desc) {
|
|
if (state != directory._state) {
|
|
return make_ready_future<>();
|
|
}
|
|
if (status != "sealed") {
|
|
dirlog.warn("Skip processing {} {} entry from {} (must have been picked up by garbage collector)", status, desc.generation, _owner);
|
|
return make_ready_future<>();
|
|
}
|
|
if (!sstable_generation_generator::maybe_owned_by_this_shard(desc.generation)) {
|
|
return make_ready_future<>();
|
|
}
|
|
|
|
dirlog.debug("Processing {} entry from {}", desc.generation, _owner);
|
|
return directory.process_descriptor(std::move(desc), flags,
|
|
[&directory] { return *directory._storage_opts; });
|
|
});
|
|
}
|
|
|
|
future<> sstable_directory::restore_components_lister::process(sstable_directory& directory, process_flags flags) {
|
|
co_await coroutine::parallel_for_each(_toc_filenames, [flags, &directory] (sstring toc_filename) -> future<> {
|
|
std::filesystem::path sst_path{toc_filename};
|
|
entry_descriptor desc = sstables::parse_path(sst_path, "", "");
|
|
if (!sstable_generation_generator::maybe_owned_by_this_shard(desc.generation)) {
|
|
co_return;
|
|
}
|
|
dirlog.debug("Processing {} entry from {}", desc.generation, toc_filename);
|
|
co_await directory.process_descriptor(
|
|
std::move(desc), flags,
|
|
[&directory, prefix=sst_path.parent_path().native()] {
|
|
return directory._storage_opts->append_to_object_storage_prefix(prefix);
|
|
});
|
|
});
|
|
}
|
|
|
|
future<> sstable_directory::commit_directory_changes() {
|
|
return _lister->commit().finally([x = std::move(_lister)] {});
|
|
}
|
|
|
|
future<> sstable_directory::filesystem_components_lister::commit() {
|
|
// Remove all files scheduled for removal
|
|
return parallel_for_each(std::exchange(_state->files_for_removal, {}), [] (sstring path) {
|
|
dirlog.info("Removing file {}", path);
|
|
return remove_file(std::move(path));
|
|
});
|
|
}
|
|
|
|
future<> sstable_directory::sstables_registry_components_lister::commit() {
|
|
return make_ready_future<>();
|
|
}
|
|
|
|
future<> sstable_directory::restore_components_lister::commit() {
|
|
return make_ready_future<>();
|
|
}
|
|
|
|
future<> sstable_directory::sstables_registry_components_lister::garbage_collect(storage& st) {
|
|
std::set<generation_type> gens_to_remove;
|
|
co_await _sstables_registry.sstables_registry_list(_owner, coroutine::lambda([&st, &gens_to_remove] (sstring status, sstable_state state, entry_descriptor desc) -> future<> {
|
|
if (status == "sealed") {
|
|
co_return;
|
|
}
|
|
|
|
dirlog.info("Removing dangling {} {} entry", desc.generation, status);
|
|
gens_to_remove.insert(desc.generation);
|
|
co_await st.remove_by_registry_entry(std::move(desc));
|
|
}));
|
|
co_await coroutine::parallel_for_each(gens_to_remove, [this] (auto gen) -> future<> {
|
|
co_await _sstables_registry.delete_entry(_owner, gen);
|
|
});
|
|
}
|
|
|
|
future<>
|
|
sstable_directory::move_foreign_sstables(sharded<sstable_directory>& source_directory) {
|
|
return parallel_for_each(std::views::iota(0u, smp::count), [this, &source_directory] (unsigned shard_id) mutable {
|
|
auto info_vec = std::exchange(_unshared_remote_sstables[shard_id], {});
|
|
if (info_vec.empty()) {
|
|
return make_ready_future<>();
|
|
}
|
|
// Should be empty, since an SSTable that belongs to this shard is not remote.
|
|
SCYLLA_ASSERT(shard_id != this_shard_id());
|
|
dirlog.debug("Moving {} unshared SSTables of {}.{} to shard {} ", info_vec.size(), _schema->ks_name(), _schema->cf_name(), shard_id);
|
|
return source_directory.invoke_on(shard_id, &sstables::sstable_directory::load_foreign_sstables, std::move(info_vec));
|
|
});
|
|
}
|
|
|
|
future<shared_sstable> sstable_directory::load_foreign_sstable(foreign_sstable_open_info& info) {
|
|
auto sst = _manager.make_sstable(_schema, *_storage_opts, info.generation, _state, info.version, info.format, db_clock::now(), _error_handler_gen);
|
|
co_await sst->load(std::move(info));
|
|
co_return sst;
|
|
}
|
|
|
|
future<>
|
|
sstable_directory::load_foreign_sstables(sstable_entry_descriptor_vector info_vec) {
|
|
co_await _manager.dir_semaphore().parallel_for_each(info_vec, [this] (const sstables::entry_descriptor& info) {
|
|
return load_sstable(info, *_storage_opts).then([this] (auto sst) {
|
|
_unshared_local_sstables.push_back(sst);
|
|
return make_ready_future<>();
|
|
});
|
|
});
|
|
}
|
|
|
|
future<std::vector<shard_id>> sstable_directory::get_shards_for_this_sstable(
|
|
const sstables::entry_descriptor& desc, const data_dictionary::storage_options& storage_opts, process_flags flags) const {
|
|
auto sst = _manager.make_sstable(_schema, storage_opts, desc.generation, _state, desc.version, desc.format, db_clock::now(), _error_handler_gen);
|
|
co_await sst->load_owner_shards(_sharder);
|
|
validate(sst, flags);
|
|
co_return sst->get_shards_for_this_sstable();
|
|
}
|
|
|
|
future<>
|
|
sstable_directory::remove_sstables(std::vector<sstables::shared_sstable> sstlist) {
|
|
dirlog.debug("Removing {} SSTables", sstlist.size());
|
|
return parallel_for_each(std::move(sstlist), [] (const sstables::shared_sstable& sst) {
|
|
dirlog.trace("Removing SSTable {}", sst->get_filename());
|
|
return sst->unlink().then([sst] {});
|
|
});
|
|
}
|
|
|
|
future<>
|
|
sstable_directory::collect_output_unshared_sstables(std::vector<sstables::shared_sstable> resharded_sstables, can_be_remote remote_ok) {
|
|
dirlog.debug("Collecting {} output SSTables (remote={})", resharded_sstables.size(), remote_ok);
|
|
return parallel_for_each(std::move(resharded_sstables), [this, remote_ok] (sstables::shared_sstable sst) {
|
|
auto shards = sst->get_shards_for_this_sstable();
|
|
SCYLLA_ASSERT(shards.size() == 1);
|
|
auto shard = shards[0];
|
|
|
|
if (shard == this_shard_id()) {
|
|
dirlog.trace("Collected output SSTable {} already local", sst->get_filename());
|
|
_unshared_local_sstables.push_back(std::move(sst));
|
|
return make_ready_future<>();
|
|
}
|
|
|
|
if (!remote_ok) {
|
|
return make_exception_future<>(std::runtime_error("Unexpected remote sstable"));
|
|
}
|
|
|
|
dirlog.trace("Collected output SSTable {} is remote. Storing it", sst->get_filename());
|
|
_unshared_remote_sstables[shard].push_back(sst->get_descriptor(component_type::Data));
|
|
return make_ready_future<>();
|
|
});
|
|
}
|
|
|
|
future<>
|
|
sstable_directory::remove_unshared_sstables(std::vector<sstables::shared_sstable> sstlist) {
|
|
// When removing input sstables from reshaping: Those SSTables used to be in the unshared local
|
|
// list. So not only do we have to remove them, we also have to update the list. Because we're
|
|
// dealing with a vector it's easier to just reconstruct the list.
|
|
dirlog.debug("Removing {} unshared SSTables", sstlist.size());
|
|
std::unordered_set<sstables::shared_sstable> exclude;
|
|
|
|
for (auto& sst : sstlist) {
|
|
exclude.insert(sst);
|
|
}
|
|
|
|
auto old = std::exchange(_unshared_local_sstables, {});
|
|
|
|
for (auto& sst : old) {
|
|
if (!exclude.contains(sst)) {
|
|
_unshared_local_sstables.push_back(sst);
|
|
}
|
|
}
|
|
|
|
// Do this last for exception safety. If there is an exception on unlink we
|
|
// want to at least leave the SSTable unshared list in a sane state.
|
|
co_await remove_sstables(std::move(sstlist));
|
|
dirlog.debug("Finished removing all SSTables");
|
|
}
|
|
|
|
|
|
future<>
|
|
sstable_directory::do_for_each_sstable(std::function<future<>(sstables::shared_sstable)> func) {
|
|
auto sstables = std::move(_unshared_local_sstables);
|
|
co_await _manager.dir_semaphore().parallel_for_each(sstables, std::move(func));
|
|
}
|
|
|
|
future<>
|
|
sstable_directory::filter_sstables(std::function<future<bool>(sstables::shared_sstable)> func) {
|
|
std::vector<sstables::shared_sstable> filtered;
|
|
co_await _manager.dir_semaphore().parallel_for_each(_unshared_local_sstables, [func = std::move(func), &filtered] (sstables::shared_sstable sst) -> future<> {
|
|
auto keep = co_await func(sst);
|
|
if (keep) {
|
|
filtered.emplace_back(sst);
|
|
}
|
|
});
|
|
_unshared_local_sstables = std::move(filtered);
|
|
}
|
|
|
|
void
|
|
sstable_directory::store_phaser(utils::phased_barrier::operation op) {
|
|
_operation_barrier.emplace(std::move(op));
|
|
}
|
|
|
|
sstable_directory::sstable_open_info_vector
|
|
sstable_directory::retrieve_shared_sstables() {
|
|
return std::exchange(_shared_sstable_info, {});
|
|
}
|
|
|
|
bool sstable_directory::compare_sstable_storage_prefix(const sstring& prefix_a, const sstring& prefix_b) noexcept {
|
|
size_t size_a = prefix_a.size();
|
|
if (prefix_a.back() == '/') {
|
|
size_a--;
|
|
}
|
|
size_t size_b = prefix_b.size();
|
|
if (prefix_b.back() == '/') {
|
|
size_b--;
|
|
}
|
|
return size_a == size_b && sstring::traits_type::compare(prefix_a.begin(), prefix_b.begin(), size_a) == 0;
|
|
}
|
|
|
|
future<sstring> sstable_directory::create_pending_deletion_log(opened_directory& base_dir, const std::vector<shared_sstable>& ssts) {
|
|
return seastar::async([&] {
|
|
min_max_tracker<generation_type> gen_tracker;
|
|
sstring pending_delete_log;
|
|
|
|
for (const auto& sst : ssts) {
|
|
gen_tracker.update(sst->generation());
|
|
}
|
|
|
|
sstring pending_delete_dir = (base_dir.path() / sstables::pending_delete_dir).native();
|
|
pending_delete_log = format("{}/sstables-{}-{}.log", pending_delete_dir, gen_tracker.min(), gen_tracker.max());
|
|
sstring tmp_pending_delete_log = pending_delete_log + ".tmp";
|
|
dirlog.trace("Writing {}", tmp_pending_delete_log);
|
|
|
|
touch_directory(pending_delete_dir).get();
|
|
auto oflags = open_flags::wo | open_flags::create | open_flags::exclusive;
|
|
// Create temporary pending_delete log file.
|
|
auto f = open_file_dma(tmp_pending_delete_log, oflags).get();
|
|
// Write all toc names into the log file.
|
|
auto out = make_file_output_stream(std::move(f), 4096).get();
|
|
auto close_out = deferred_close(out);
|
|
|
|
try {
|
|
auto trim_size = base_dir.native().size() + 1; // Account for the '/' delimiter
|
|
for (const auto& sst : ssts) {
|
|
sstring toc = seastar::to_sstring(sst->toc_filename());
|
|
if (toc.size() <= trim_size) {
|
|
on_internal_error(dirlog, fmt::format("Sstable {} outside of basedir {} is scheduled for deletion", toc, base_dir.native()));
|
|
}
|
|
out.write(toc.begin() + trim_size, toc.size() - trim_size).get();
|
|
out.write("\n").get();
|
|
dirlog.trace("Wrote '{}' to {}", sstring(toc.begin() + trim_size, toc.size() - trim_size), tmp_pending_delete_log);
|
|
}
|
|
|
|
out.flush().get();
|
|
close_out.close_now();
|
|
} catch (...) {
|
|
dirlog.warn("Error while writing {}: {}. Ignoring.", tmp_pending_delete_log, std::current_exception());
|
|
}
|
|
|
|
// Once flushed and closed, the temporary log file can be renamed.
|
|
io_check(rename_file, tmp_pending_delete_log, pending_delete_log).get();
|
|
|
|
// Guarantee that the changes above reached the disk.
|
|
base_dir.sync(general_disk_error_handler).get();
|
|
dirlog.debug("{} written successfully.", pending_delete_log);
|
|
|
|
return pending_delete_log;
|
|
});
|
|
}
|
|
|
|
// FIXME: Go through maybe_delete_large_partitions_entry on recovery since
|
|
// this is an indication we crashed in the middle of atomic deletion
|
|
future<> sstable_directory::filesystem_components_lister::replay_pending_delete_log(fs::path pending_delete_log) {
|
|
dirlog.debug("Reading pending_deletes log file {}", pending_delete_log);
|
|
fs::path pending_delete_dir = pending_delete_log.parent_path();
|
|
try {
|
|
sstring sstdir = pending_delete_dir.parent_path().native();
|
|
auto text = co_await seastar::util::read_entire_file_contiguous(pending_delete_log);
|
|
|
|
sstring all(text.begin(), text.end());
|
|
std::vector<sstring> basenames;
|
|
boost::split(basenames, all, boost::is_any_of("\n"), boost::token_compress_on);
|
|
auto tocs = std::ranges::to<std::vector<sstring>>(basenames | std::views::filter([] (auto&& basename) { return !basename.empty(); }));
|
|
dirlog.debug("TOCs to remove: {}", tocs);
|
|
co_await parallel_for_each(tocs, [&sstdir] (const sstring& name) {
|
|
// Only move TOC to TOC.tmp, the rest will be finished by regular process
|
|
return make_toc_temporary(sstdir + "/" + name).discard_result();
|
|
});
|
|
dirlog.debug("Replayed {}, removing", pending_delete_log);
|
|
co_await remove_file(pending_delete_log.native());
|
|
} catch (...) {
|
|
dirlog.warn("Error replaying {}: {}. Ignoring.", pending_delete_log, std::current_exception());
|
|
}
|
|
}
|
|
|
|
future<> sstable_directory::filesystem_components_lister::garbage_collect(storage& st) {
|
|
// First pass, cleanup temporary sstable directories and sstables pending delete.
|
|
co_await cleanup_column_family_temp_sst_dirs();
|
|
co_await handle_sstables_pending_delete();
|
|
}
|
|
|
|
future<> sstable_directory::filesystem_components_lister::cleanup_column_family_temp_sst_dirs() {
|
|
directory_lister lister(_directory, lister::dir_entry_types::of<directory_entry_type::directory>());
|
|
auto futures = co_await with_closeable(std::move(lister), [this] (directory_lister& lister) -> future<std::vector<future<>>> {
|
|
std::vector<future<>> futures;
|
|
while (auto de = co_await lister.get()) {
|
|
// push futures that remove files/directories into an array of futures,
|
|
// so that the supplied callback will not block lister from
|
|
// reading the next entry in the directory.
|
|
fs::path dirpath = _directory / de->name;
|
|
if (dirpath.extension().string() == tempdir_extension) {
|
|
dirlog.info("Found temporary sstable directory: {}, removing", dirpath);
|
|
futures.push_back(io_check([dirpath = std::move(dirpath)] () { return lister::rmdir(dirpath); }));
|
|
}
|
|
}
|
|
co_return futures;
|
|
});
|
|
|
|
co_await when_all_succeed(futures.begin(), futures.end()).discard_result();
|
|
}
|
|
|
|
future<> sstable_directory::filesystem_components_lister::handle_sstables_pending_delete() {
|
|
auto pending_delete_dir = _directory / sstables::pending_delete_dir;
|
|
auto exists = co_await file_exists(pending_delete_dir.native());
|
|
if (!exists) {
|
|
co_return;
|
|
}
|
|
|
|
directory_lister lister(pending_delete_dir, lister::dir_entry_types::of<directory_entry_type::regular>());
|
|
auto futures = co_await with_closeable(std::move(lister), coroutine::lambda([this, &pending_delete_dir] (directory_lister& lister) -> future<std::vector<future<>>> {
|
|
std::vector<future<>> futures;
|
|
while (auto de = co_await lister.get()) {
|
|
// push nested futures that remove files/directories into an array of futures,
|
|
// so that the supplied callback will not block lister from
|
|
// reading the next entry in the directory.
|
|
fs::path file_path = pending_delete_dir / de->name;
|
|
if (file_path.extension() == ".tmp") {
|
|
dirlog.info("Found temporary pending_delete log file: {}, deleting", file_path);
|
|
futures.push_back(remove_file(file_path.string()));
|
|
} else if (file_path.extension() == ".log") {
|
|
dirlog.info("Found pending_delete log file: {}, replaying", file_path);
|
|
auto f = replay_pending_delete_log(std::move(file_path));
|
|
futures.push_back(std::move(f));
|
|
} else {
|
|
dirlog.debug("Found unknown file in pending_delete directory: {}, ignoring", file_path);
|
|
}
|
|
}
|
|
co_return futures;
|
|
}));
|
|
|
|
co_await when_all_succeed(futures.begin(), futures.end()).discard_result();
|
|
}
|
|
|
|
}
|