Files
scylladb/sstables/storage.cc
Botond Dénes 85f05fbe1b Revert "Merge 'Add digests for all sstable components in scylla metadata' from Taras Veretilnyk"
This reverts commit 866c96f536, reversing
changes made to 367633270a.

This change caused all longevities to fail, with a crash in parsing
scylla-metadata. The investigation is still ongoing, with no quick fix
in sight yet.

Fixes: #27496

Closes scylladb/scylladb#27518
2025-12-16 11:34:40 +02:00

925 lines
43 KiB
C++

/*
* Copyright (C) 2015-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include "storage.hh"
#include <cerrno>
#include <boost/algorithm/string.hpp>
#include <boost/algorithm/string/erase.hpp>
#include <exception>
#include <stdexcept>
#include <fmt/std.h>
#include <seastar/core/when_all.hh>
#include <seastar/coroutine/exception.hh>
#include <seastar/coroutine/parallel_for_each.hh>
#include <seastar/util/file.hh>
#include <seastar/util/closeable.hh>
#include "db/config.hh"
#include "db/extensions.hh"
#include "sstables/exceptions.hh"
#include "sstables/object_storage_client.hh"
#include "sstables/sstable_directory.hh"
#include "sstables/sstables_manager.hh"
#include "sstables/sstable_version.hh"
#include "sstables/integrity_checked_file_impl.hh"
#include "sstables/writer.hh"
#include "utils/assert.hh"
#include "utils/lister.hh"
#include "utils/overloaded_functor.hh"
#include "utils/memory_data_sink.hh"
#include "utils/s3/client.hh"
#include "utils/exceptions.hh"
#include "utils/to_string.hh"
#include "utils/checked-file-impl.hh"
#include "utils/io-wrappers.hh"
namespace sstables {
// cannot define these classes in an anonymous namespace, as we need to
// declare these storage classes as "friend" of class sstable
class filesystem_storage final : public sstables::storage {
mutable opened_directory _base_dir;
mutable opened_directory _dir;
std::optional<std::filesystem::path> _temp_dir; // Valid while the sstable is being created, until sealed
private:
using mark_for_removal = bool_class<class mark_for_removal_tag>;
template <typename Comp>
requires std::is_same_v<Comp, component_type> || std::is_same_v<Comp, sstring>
static auto filename(const sstable& sst, sstring dir, generation_type gen, Comp comp) {
return sstable::filename(dir, sst._schema->ks_name(), sst._schema->cf_name(), sst._version, gen, sst._format, comp);
}
future<> check_create_links_replay(const sstable& sst, const sstring& dst_dir, generation_type dst_gen, const std::vector<std::pair<sstables::component_type, sstring>>& comps) const;
future<> remove_temp_dir();
virtual future<> create_links(const sstable& sst, const std::filesystem::path& dir) const override;
future<> create_links_common(const sstable& sst, sstring dst_dir, generation_type dst_gen, mark_for_removal mark_for_removal) const;
future<> create_links_common(const sstable& sst, const std::filesystem::path& dir, std::optional<generation_type> dst_gen) const;
future<> touch_temp_dir(const sstable& sst);
future<> move(const sstable& sst, sstring new_dir, generation_type generation, delayed_commit_changes* delay) override;
future<> rename_new_file(const sstable& sst, sstring from_name, sstring to_name) const;
future<> change_dir(sstring new_dir) {
auto old_dir = std::exchange(_dir, opened_directory(new_dir));
return old_dir.close();
}
virtual future<> change_dir_for_test(sstring nd) override {
return change_dir(nd);
}
public:
explicit filesystem_storage(sstring dir, sstable_state state)
: _base_dir(dir)
, _dir(make_path(dir, state))
{}
virtual future<> seal(const sstable& sst) override;
virtual future<> snapshot(const sstable& sst, sstring dir, absolute_path abs, std::optional<generation_type> gen) const override;
virtual future<> change_state(const sstable& sst, sstable_state state, generation_type generation, delayed_commit_changes* delay) override;
// runs in async context
virtual void open(sstable& sst) override;
virtual future<> wipe(const sstable& sst, sync_dir) noexcept override;
virtual future<file> open_component(const sstable& sst, component_type type, open_flags flags, file_open_options options, bool check_integrity) override;
virtual future<data_sink> make_data_or_index_sink(sstable& sst, component_type type) override;
future<data_source> make_data_or_index_source(sstable& sst, component_type type, file f, uint64_t offset, uint64_t len, file_input_stream_options opt) const override;
virtual future<data_sink> make_component_sink(sstable& sst, component_type type, open_flags oflags, file_output_stream_options options) override;
virtual future<> destroy(const sstable& sst) override { return make_ready_future<>(); }
virtual future<atomic_delete_context> atomic_delete_prepare(const std::vector<shared_sstable>&) const override;
virtual future<> atomic_delete_complete(atomic_delete_context ctx) const override;
virtual future<> remove_by_registry_entry(entry_descriptor desc) override;
virtual future<uint64_t> free_space() const override {
return seastar::fs_avail(prefix());
}
virtual future<> unlink_component(const sstable& sst, component_type) noexcept override;
virtual sstring prefix() const override { return _dir.native(); }
};
future<data_sink> filesystem_storage::make_data_or_index_sink(sstable& sst, component_type type) {
file_output_stream_options options;
options.buffer_size = sst.sstable_buffer_size;
options.write_behind = 10;
SCYLLA_ASSERT(
type == component_type::Data
|| type == component_type::Index
|| type == component_type::Rows
|| type == component_type::Partitions);
switch (type) {
case component_type::Data:
return make_file_data_sink(std::move(sst._data_file), options);
case component_type::Index:
return make_file_data_sink(std::move(sst._index_file), options);
case component_type::Rows:
return make_file_data_sink(std::move(sst._rows_file), options);
case component_type::Partitions:
return make_file_data_sink(std::move(sst._partitions_file), options);
default:
abort();
}
}
future<data_source> filesystem_storage::make_data_or_index_source(sstable&, component_type type, file f, uint64_t offset, uint64_t len, file_input_stream_options opt) const {
SCYLLA_ASSERT(type == component_type::Data || type == component_type::Index);
co_return make_file_data_source(std::move(f), offset, len, std::move(opt));
}
future<data_sink> filesystem_storage::make_component_sink(sstable& sst, component_type type, open_flags oflags, file_output_stream_options options) {
return sst.new_sstable_component_file(sst._write_error_handler, type, oflags).then([options = std::move(options)] (file f) mutable {
return make_file_data_sink(std::move(f), std::move(options));
});
}
static future<file> open_sstable_component_file_non_checked(std::string_view name, open_flags flags, file_open_options options,
bool check_integrity) noexcept {
if (flags != open_flags::ro && check_integrity) {
return open_integrity_checked_file_dma(name, flags, options);
}
return open_file_dma(name, flags, options);
}
future<> filesystem_storage::rename_new_file(const sstable& sst, sstring from_name, sstring to_name) const {
return sst.sstable_write_io_check(rename_file, from_name, to_name).handle_exception([from_name, to_name] (std::exception_ptr ep) {
sstlog.error("Could not rename SSTable component {} to {}. Found exception: {}", from_name, to_name, ep);
return make_exception_future<>(ep);
});
}
static future<file> maybe_wrap_file(const sstable& sst, component_type type, open_flags flags, future<file> f) {
if (type != component_type::TOC && type != component_type::TemporaryTOC) {
for (auto * ext : sst.manager().file_io_extensions()) {
f = with_file_close_on_failure(std::move(f), [ext, &sst, type, flags] (file f) {
return ext->wrap_file(sst, type, f, flags).then([f](file nf) mutable {
return nf ? nf : std::move(f);
});
});
}
}
return f;
}
static future<file> maybe_wrap_file(const sstable& sst, component_type type, open_flags flags, file f) {
return maybe_wrap_file(sst, type, flags, make_ready_future<file>(std::move(f)));
}
future<file> filesystem_storage::open_component(const sstable& sst, component_type type, open_flags flags, file_open_options options, bool check_integrity) {
auto create_flags = open_flags::create | open_flags::exclusive;
auto readonly = (flags & create_flags) != create_flags;
auto tgt_dir = !readonly && _temp_dir ? *_temp_dir : _dir.path();
auto name = tgt_dir / sst.component_basename(type);
auto f = open_sstable_component_file_non_checked(name.native(), flags, options, check_integrity);
if (!readonly) {
f = with_file_close_on_failure(std::move(f), [this, &sst, type, name = std::move(name)] (file fd) mutable {
return rename_new_file(sst, name.native(), fmt::to_string(sst.filename(type))).then([fd = std::move(fd)] () mutable {
return make_ready_future<file>(std::move(fd));
});
});
}
return maybe_wrap_file(sst, type, flags, std::move(f));
}
void filesystem_storage::open(sstable& sst) {
touch_temp_dir(sst).get();
// Writing TOC content to temporary file.
// If creation of temporary TOC failed, it implies that that boot failed to
// delete a sstable with temporary for this column family, or there is a
// sstable being created in parallel with the same generation.
file_output_stream_options options;
options.buffer_size = 4096;
auto sink = make_component_sink(sst, component_type::TemporaryTOC,
open_flags::wo |
open_flags::create |
open_flags::exclusive,
options).get();
auto w = file_writer(output_stream<char>(std::move(sink)), component_name(sst, component_type::TemporaryTOC));
bool toc_exists = file_exists(fmt::to_string(sst.filename(component_type::TOC))).get();
if (toc_exists) {
// TOC will exist at this point if write_components() was called with
// the generation of a sstable that exists.
w.close();
remove_file(fmt::to_string(sst.filename(component_type::TemporaryTOC))).get();
throw std::runtime_error(format("SSTable write failed due to existence of TOC file for generation {} of {}.{}", sst._generation, sst._schema->ks_name(), sst._schema->cf_name()));
}
sst.write_toc(std::move(w));
// Flushing parent directory to guarantee that temporary TOC file reached
// the disk.
_dir.sync(sst._write_error_handler).get();
}
future<> filesystem_storage::seal(const sstable& sst) {
// SSTable sealing is about renaming temporary TOC file after guaranteeing
// that each component reached the disk safely.
co_await remove_temp_dir();
// Guarantee that every component of this sstable reached the disk.
co_await _dir.sync(sst._write_error_handler);
// Rename TOC because it's no longer temporary.
co_await sst.sstable_write_io_check(rename_file, fmt::to_string(sst.filename(component_type::TemporaryTOC)), fmt::to_string(sst.filename(component_type::TOC)));
co_await _dir.sync(sst._write_error_handler);
// If this point was reached, sstable should be safe in disk.
sstlog.debug("SSTable with generation {} of {}.{} was sealed successfully.", sst._generation, sst._schema->ks_name(), sst._schema->cf_name());
}
future<> filesystem_storage::touch_temp_dir(const sstable& sst) {
if (_temp_dir) {
co_return;
}
auto tmp = _dir.path() / fmt::format("{}{}", sst._generation, tempdir_extension);
sstlog.debug("Touching temp_dir={}", tmp);
co_await sst.sstable_touch_directory_io_check(tmp);
_temp_dir = std::move(tmp);
}
future<> filesystem_storage::remove_temp_dir() {
if (!_temp_dir) {
co_return;
}
sstlog.debug("Removing temp_dir={}", _temp_dir);
try {
co_await remove_file(_temp_dir->native());
} catch (...) {
sstlog.error("Could not remove temporary directory: {}", std::current_exception());
throw;
}
_temp_dir.reset();
}
static bool is_same_file(const seastar::stat_data& sd1, const seastar::stat_data& sd2) noexcept {
return sd1.device_id == sd2.device_id && sd1.inode_number == sd2.inode_number;
}
static future<bool> same_file(sstring path1, sstring path2) noexcept {
return when_all_succeed(file_stat(std::move(path1)), file_stat(std::move(path2))).then_unpack([] (seastar::stat_data sd1, seastar::stat_data sd2) {
return is_same_file(sd1, sd2);
});
}
// support replay of link by considering link_file EEXIST error as successful when the newpath is hard linked to oldpath.
future<> idempotent_link_file(sstring oldpath, sstring newpath) noexcept {
bool exists = false;
std::exception_ptr ex;
try {
co_await link_file(oldpath, newpath);
} catch (const std::system_error& e) {
ex = std::current_exception();
exists = (e.code().value() == EEXIST);
} catch (...) {
ex = std::current_exception();
}
if (!ex) {
co_return;
}
if (exists && (co_await same_file(oldpath, newpath))) {
co_return;
}
co_await coroutine::return_exception_ptr(std::move(ex));
}
// Check is the operation is replayed, possibly when moving sstables
// from staging to the base dir, for example, right after create_links completes,
// and right before deleting the source links.
// We end up in two valid sstables in this case, so make create_links idempotent.
future<> filesystem_storage::check_create_links_replay(const sstable& sst, const sstring& dst_dir, generation_type dst_gen,
const std::vector<std::pair<sstables::component_type, sstring>>& comps) const {
return parallel_for_each(comps, [this, &sst, &dst_dir, dst_gen] (const auto& p) mutable -> future<> {
auto comp = p.second;
auto src = filename(sst, _dir.native(), sst._generation, comp);
auto dst = filename(sst, dst_dir, dst_gen, comp);
if (co_await file_exists(dst)) {
future<bool> fut = co_await coroutine::as_future(same_file(src, dst));
if (fut.failed()) {
auto eptr = fut.get_exception();
sstlog.error("Error while linking SSTable: {} to {}: {}", src, dst, eptr);
co_await coroutine::return_exception_ptr(std::move(eptr));
}
auto same = fut.get();
if (!same) {
auto msg = format("Error while linking SSTable: {} to {}: File exists", src, dst);
sstlog.error("{}", msg);
co_await coroutine::return_exception(malformed_sstable_exception(msg));
}
}
});
}
/// create_links_common links all component files from the sstable directory to
/// the given destination directory, using the provided generation.
///
/// It first checks if this is a replay of a previous
/// create_links call, by testing if the destination names already
/// exist, and if so, if they point to the same inodes as the
/// source names. Otherwise, we return an error.
/// This is an indication that something went wrong.
///
/// Creating the links is done by:
/// First, linking the source TOC component to the destination TemporaryTOC,
/// to mark the destination for rollback, in case we crash mid-way.
/// Then, all components are linked.
///
/// Note that if scylla crashes at this point, the destination SSTable
/// will have both a TemporaryTOC file and a regular TOC file.
/// It should be deleted on restart, thus rolling the operation backwards.
///
/// Eventually, if \c mark_for_removal is unset, the destination
/// TemporaryTOC is removed, to "commit" the destination sstable;
///
/// Otherwise, if \c mark_for_removal is set, the TemporaryTOC at the destination
/// is moved to the source directory to mark the source sstable for removal,
/// thus atomically toggling crash recovery from roll-back to roll-forward.
///
/// Similar to the scenario described above, crashing at this point
/// would leave the source sstable marked for removal, possibly
/// having both a TemporaryTOC file and a regular TOC file, and
/// then the source sstable should be deleted on restart, rolling the
/// operation forward.
///
/// Note that idempotent versions of link_file and rename_file
/// are used. These versions handle EEXIST errors that may happen
/// when the respective operations are replayed.
///
/// \param sst - the sstable to work on
/// \param dst_dir - the destination directory.
/// \param generation - the generation of the destination sstable
/// \param mark_for_removal - mark the sstable for removal after linking it to the destination dst_dir
future<> filesystem_storage::create_links_common(const sstable& sst, sstring dst_dir, generation_type generation, mark_for_removal mark_for_removal) const {
sstlog.trace("create_links: {} -> {} generation={} mark_for_removal={}", sst.get_filename(), dst_dir, generation, mark_for_removal);
auto comps = sst.all_components();
co_await check_create_links_replay(sst, dst_dir, generation, comps);
// TemporaryTOC is always first, TOC is always last
auto dst = filename(sst, dst_dir, generation, component_type::TemporaryTOC);
co_await sst.sstable_write_io_check(idempotent_link_file, fmt::to_string(sst.filename(component_type::TOC)), std::move(dst));
auto dir = opened_directory(dst_dir);
co_await dir.sync(sst._write_error_handler);
co_await parallel_for_each(comps, [this, &sst, &dst_dir, generation] (auto p) {
auto src = filename(sst, _dir.native(), sst._generation, p.second);
auto dst = filename(sst, dst_dir, generation, p.second);
return sst.sstable_write_io_check(idempotent_link_file, std::move(src), std::move(dst));
});
co_await dir.sync(sst._write_error_handler);
auto dst_temp_toc = filename(sst, dst_dir, generation, component_type::TemporaryTOC);
if (mark_for_removal) {
// Now that the source sstable is linked to new_dir, mark the source links for
// deletion by leaving a TemporaryTOC file in the source directory.
auto src_temp_toc = filename(sst, _dir.native(), sst._generation, component_type::TemporaryTOC);
co_await sst.sstable_write_io_check(rename_file, std::move(dst_temp_toc), std::move(src_temp_toc));
co_await _dir.sync(sst._write_error_handler);
} else {
// Now that the source sstable is linked to dir, remove
// the TemporaryTOC file at the destination.
co_await sst.sstable_write_io_check(remove_file, std::move(dst_temp_toc));
}
co_await dir.sync(sst._write_error_handler);
co_await dir.close();
sstlog.trace("create_links: {} -> {} generation={}: done", sst.get_filename(), dst_dir, generation);
}
future<> filesystem_storage::create_links_common(const sstable& sst, const std::filesystem::path& dir, std::optional<generation_type> gen) const {
return create_links_common(sst, dir.native(), gen.value_or(sst._generation), mark_for_removal::no);
}
future<> filesystem_storage::create_links(const sstable& sst, const std::filesystem::path& dir) const {
return create_links_common(sst, dir.native(), sst._generation, mark_for_removal::no);
}
future<> filesystem_storage::snapshot(const sstable& sst, sstring dir, absolute_path abs, std::optional<generation_type> gen) const {
std::filesystem::path snapshot_dir;
if (abs) {
snapshot_dir = dir;
} else {
snapshot_dir = _dir.path() / dir;
}
co_await sst.sstable_touch_directory_io_check(snapshot_dir);
co_await create_links_common(sst, snapshot_dir, std::move(gen));
}
future<> filesystem_storage::move(const sstable& sst, sstring new_dir, generation_type new_generation, delayed_commit_changes* delay_commit) {
co_await touch_directory(new_dir);
sstring old_dir = _dir.native();
sstlog.debug("Moving {} old_generation={} to {} new_generation={} do_sync_dirs={}",
sst.get_filename(), sst._generation, new_dir, new_generation, delay_commit == nullptr);
co_await create_links_common(sst, new_dir, new_generation, mark_for_removal::yes);
co_await change_dir(new_dir);
generation_type old_generation = sst._generation;
co_await coroutine::parallel_for_each(sst.all_components(), [&sst, old_generation, old_dir] (auto p) {
return sst.sstable_write_io_check(remove_file, filename(sst, old_dir, old_generation, p.second));
});
auto temp_toc = sstable_version_constants::get_component_map(sst._version).at(component_type::TemporaryTOC);
co_await sst.sstable_write_io_check(remove_file, filename(sst, old_dir, old_generation, temp_toc));
if (delay_commit == nullptr) {
co_await when_all(sst.sstable_write_io_check(sync_directory, old_dir), _dir.sync(sst._write_error_handler)).discard_result();
} else {
delay_commit->_dirs.insert(old_dir);
delay_commit->_dirs.insert(new_dir);
}
}
future<> filesystem_storage::change_state(const sstable& sst, sstable_state state, generation_type new_generation, delayed_commit_changes* delay_commit) {
auto to = state_to_dir(state);
auto path = _dir.path();
auto current = path.filename().native();
// Moving between states means moving between basedir/state subdirectories.
// However, normal state maps to the basedir itself and thus there's no way
// to check if current is normal_dir. The best that can be done here is to
// check that it's not anything else
if (current == staging_dir || current == upload_dir || current == quarantine_dir) {
if (to == quarantine_dir && current != staging_dir) {
// Legacy exception -- quarantine from anything but staging
// moves to the current directory quarantine subdir
path = path / to;
} else {
path = path.parent_path() / to;
}
} else {
current = normal_dir;
path = path / to;
}
if (current == to) {
co_return; // Already there
}
sstlog.info("Moving sstable {} to {}", sst.get_filename(), path);
co_await move(sst, path.native(), std::move(new_generation), delay_commit);
}
static inline fs::path parent_path(const sstring& fname) {
return fs::canonical(fs::path(fname)).parent_path();
}
future<> filesystem_storage::wipe(const sstable& sst, sync_dir sync) noexcept {
// We must be able to generate toc_filename()
// in order to delete the sstable.
// Running out of memory here will terminate.
auto name = [&sst] () noexcept {
memory::scoped_critical_alloc_section _;
return fmt::to_string(sst.toc_filename());
}();
try {
auto new_toc_name = co_await make_toc_temporary(name, sync);
if (!new_toc_name.empty()) {
auto dir_name = parent_path(new_toc_name);
co_await coroutine::parallel_for_each(sst.all_components(), [&sst, &dir_name] (auto component) -> future<> {
if (component.first == component_type::TOC) {
// already renamed
co_return;
}
auto fname = filename(sst, dir_name.native(), sst._generation, component.second);
try {
co_await sst.sstable_write_io_check(remove_file, fname);
} catch (...) {
if (!is_system_error_errno(ENOENT)) {
throw;
}
sstlog.debug("Forgiving ENOENT when deleting file {}", fname);
}
});
if (sync) {
co_await sst.sstable_write_io_check(sync_directory, dir_name.native());
}
co_await sst.sstable_write_io_check(remove_file, new_toc_name);
}
} catch (...) {
// Log and ignore the failure since there is nothing much we can do about it at this point.
// a. Compaction will retry deleting the sstable in the next pass, and
// b. in the future sstables_manager is planned to handle sstables deletion.
// c. Eventually we may want to record these failures in a system table
// and notify the administrator about that for manual handling (rather than aborting).
sstlog.warn("Failed to delete {}: {}. Ignoring.", name, std::current_exception());
}
if (_temp_dir) {
try {
co_await recursive_remove_directory(*_temp_dir);
_temp_dir.reset();
} catch (...) {
sstlog.warn("Exception when deleting temporary sstable directory {}: {}", *_temp_dir, std::current_exception());
}
}
}
future<atomic_delete_context> filesystem_storage::atomic_delete_prepare(const std::vector<shared_sstable>& ssts) const {
atomic_delete_context res;
for (const auto& sst : ssts) {
auto prefix = sst->_storage->prefix();
res.prefixes.insert(prefix);
}
res.pending_delete_log = co_await sstable_directory::create_pending_deletion_log(_base_dir, ssts);
co_return std::move(res);
}
future<> filesystem_storage::atomic_delete_complete(atomic_delete_context ctx) const {
co_await coroutine::parallel_for_each(ctx.prefixes, [] (const auto& dir) -> future<> {
co_await sync_directory(dir);
});
// Once all sstables are deleted, the log file can be removed.
// Note: the log file will be removed also if unlink failed to remove
// any sstable and ignored the error.
const auto& log = ctx.pending_delete_log;
try {
co_await remove_file(log);
sstlog.debug("{} removed.", log);
} catch (...) {
sstlog.warn("Error removing {}: {}. Ignoring.", log, std::current_exception());
}
}
future<> filesystem_storage::remove_by_registry_entry(entry_descriptor desc) {
on_internal_error(sstlog, "Filesystem storage doesn't keep its entries in registry");
}
future<> filesystem_storage::unlink_component(const sstable& sst, component_type type) noexcept {
std::string name;
try {
name = fmt::to_string(sst.filename(type));
co_await sst.sstable_write_io_check(remove_file, name);
} catch (...) {
// Log and ignore the failure since there is nothing much we can do about it at this point.
// a. Compaction will retry deleting the sstable in the next pass, and
// b. in the future sstables_manager is planned to handle sstables deletion.
// c. Eventually we may want to record these failures in a system table
// and notify the administrator about that for manual handling (rather than aborting).
sstlog.warn("Failed to delete {}: {}. Ignoring.", name, std::current_exception());
}
}
class object_storage_base : public sstables::storage {
protected:
sstring _type;
shared_ptr<sstables::object_storage_client> _client;
sstring _bucket;
std::variant<sstring, table_id> _location;
seastar::abort_source* _as;
static constexpr auto status_creating = "creating";
static constexpr auto status_sealed = "sealed";
static constexpr auto status_removing = "removing";
object_name make_object_name(const sstable& sst, component_type type) const;
table_id owner() const {
if (std::holds_alternative<sstring>(_location)) {
on_internal_error(sstlog, format("Storage holds {} prefix, but registry owner is expected", std::get<sstring>(_location)));
}
return std::get<table_id>(_location);
}
seastar::abort_source* abort_source() const {
return _as;
}
public:
object_storage_base(sstring type, shared_ptr<sstables::object_storage_client> client, sstring bucket, std::variant<sstring, table_id> loc, seastar::abort_source* as)
: _type(type)
, _client(std::move(client))
, _bucket(std::move(bucket))
, _location(std::move(loc))
, _as(as)
{}
future<> seal(const sstable& sst) override;
future<> snapshot(const sstable& sst, sstring dir, absolute_path abs, std::optional<generation_type>) const override;
future<> change_state(const sstable& sst, sstable_state state, generation_type generation, delayed_commit_changes* delay) override;
// runs in async context
void open(sstable& sst) override;
future<> wipe(const sstable& sst, sync_dir) noexcept override;
future<file> open_component(const sstable& sst, component_type type, open_flags flags, file_open_options options, bool check_integrity) override;
future<data_sink> make_data_or_index_sink(sstable& sst, component_type type) override;
future<data_source> make_data_or_index_source(sstable& sst, component_type type, file f, uint64_t offset, uint64_t len, file_input_stream_options opt) const override;
future<data_sink> make_component_sink(sstable& sst, component_type type, open_flags oflags, file_output_stream_options options) override;
future<> destroy(const sstable& sst) override {
return make_ready_future<>();
}
future<atomic_delete_context> atomic_delete_prepare(const std::vector<shared_sstable>&) const override;
future<> atomic_delete_complete(atomic_delete_context ctx) const override;
future<> remove_by_registry_entry(entry_descriptor desc) override;
future<uint64_t> free_space() const override {
// assumes infinite space on s3/gs (https://aws.amazon.com/s3/faqs/#How_much_data_can_I_store).
return make_ready_future<uint64_t>(std::numeric_limits<uint64_t>::max());
}
future<> unlink_component(const sstable& sst, component_type) noexcept override;
sstring prefix() const override {
return std::visit([] (const auto& v) { return fmt::to_string(v); }, _location);
}
future<> put_object(object_name name, ::memory_data_sink_buffers bufs) {
return _client->put_object(std::move(name), std::move(bufs), abort_source());
}
future<> delete_object(object_name name) {
return _client->delete_object(std::move(name));
}
file make_readable_file(object_name name) {
return _client->make_readable_file(std::move(name), abort_source());
}
data_sink make_data_upload_sink(object_name name, std::optional<unsigned> max_parts_per_piece) {
return _client->make_data_upload_sink(std::move(name), max_parts_per_piece, abort_source());
}
data_sink make_upload_sink(object_name name) {
return _client->make_upload_sink(std::move(name), abort_source());
}
};
class s3_storage : public object_storage_base {
public:
s3_storage(shared_ptr<sstables::object_storage_client> client, sstring bucket, std::variant<sstring, table_id> loc, seastar::abort_source* as)
: object_storage_base("S3", std::move(client), std::move(bucket), std::move(loc), as)
{}
future<data_source> make_data_or_index_source(sstable& sst, component_type type, file f, uint64_t offset, uint64_t len, file_input_stream_options opt) const override;
};
object_name object_storage_base::make_object_name(const sstable& sst, component_type type) const {
if (!sst.generation().is_uuid_based()) {
throw std::runtime_error(fmt::format("'{}' STORAGE only works with uuid_sstable_identifier enabled", _type));
}
return std::visit(overloaded_functor {
[&] (const sstring& prefix) {
return object_name(_bucket, prefix, sst.component_basename(type));
},
[&] (const table_id& owner) {
return object_name(_bucket, sst.generation(), sstable_version_constants::get_component_map(sst.get_version()).at(type));
}
}, _location);
}
void object_storage_base::open(sstable& sst) {
entry_descriptor desc(sst._generation, sst._version, sst._format, component_type::TOC);
sst.manager().sstables_registry().create_entry(owner(), status_creating, sst._state, std::move(desc)).get();
memory_data_sink_buffers bufs;
sst.write_toc(
file_writer(
output_stream<char>(
data_sink(
std::make_unique<memory_data_sink>(bufs)
)
)
)
);
put_object(make_object_name(sst, component_type::TOC), std::move(bufs)).get();
}
future<file> object_storage_base::open_component(const sstable& sst, component_type type, open_flags flags, file_open_options options, bool check_integrity) {
return maybe_wrap_file(sst, type, flags, make_readable_file(make_object_name(sst, type)));
}
static future<data_sink> maybe_wrap_sink(const sstable& sst, component_type type, data_sink sink) {
if (type != component_type::TOC && type != component_type::TemporaryTOC) {
for (auto* ext : sst.manager().file_io_extensions()) {
std::exception_ptr p;
try {
sink = co_await ext->wrap_sink(sst, type, std::move(sink));
} catch (...) {
p = std::current_exception();
}
if (p) {
co_await sink.close();
std::rethrow_exception(std::move(p));
}
}
}
co_return sink;
}
static future<data_source> maybe_wrap_source(const sstable& sst, component_type type, data_source src, uint64_t offset, uint64_t len) {
if (type != component_type::TOC && type != component_type::TemporaryTOC) {
for (auto* ext : sst.manager().file_io_extensions()) {
std::exception_ptr p;
try {
src = co_await ext->wrap_source(sst, type, std::move(src));
} catch (...) {
p = std::current_exception();
}
if (p) {
std::rethrow_exception(std::move(p));
}
}
}
co_return create_ranged_source(std::move(src), offset, len);
}
future<data_sink> object_storage_base::make_data_or_index_sink(sstable& sst, component_type type) {
SCYLLA_ASSERT(
type == component_type::Data
|| type == component_type::Index
|| type == component_type::Rows
|| type == component_type::Partitions);
// FIXME: if we have file size upper bound upfront, it's better to use make_upload_sink() instead
return maybe_wrap_sink(sst, type, make_data_upload_sink(make_object_name(sst, type), std::nullopt));
}
future<data_source>
object_storage_base::make_data_or_index_source(sstable& sst, component_type type, file f, uint64_t offset, uint64_t len, file_input_stream_options options) const {
co_return co_await maybe_wrap_source(sst, type, _client->make_download_source(make_object_name(sst, type), abort_source()), offset, len);
}
future<data_source>
s3_storage::make_data_or_index_source(sstable& sst, component_type type, file f, uint64_t offset, uint64_t len, file_input_stream_options options) const {
if (offset == 0) {
co_return co_await object_storage_base::make_data_or_index_source(sst, type, std::move(f), offset, len, std::move(options));
}
co_return make_file_data_source(
co_await maybe_wrap_file(sst, type, open_flags::ro, _client->make_readable_file(make_object_name(sst, type), abort_source())), offset, len, std::move(options));
}
future<data_sink> object_storage_base::make_component_sink(sstable& sst, component_type type, open_flags oflags, file_output_stream_options options) {
return maybe_wrap_sink(sst, type, make_upload_sink(make_object_name(sst, type)));
}
future<> object_storage_base::seal(const sstable& sst) {
co_await sst.manager().sstables_registry().update_entry_status(owner(), sst.generation(), status_sealed);
}
future<> object_storage_base::change_state(const sstable& sst, sstable_state state, generation_type generation, delayed_commit_changes* delay) {
if (generation != sst._generation) {
// The 'generation' field is clustering key in system.sstables and cannot be
// changed. However, that's fine, state AND generation change means the sstable
// is moved from upload directory and this is another issue for S3 (#13018)
co_await coroutine::return_exception(std::runtime_error("Cannot change state and generation of an S3 object"));
}
co_await sst.manager().sstables_registry().update_entry_state(owner(), sst.generation(), state);
}
future<> object_storage_base::wipe(const sstable& sst, sync_dir) noexcept {
auto& sstables_registry = sst.manager().sstables_registry();
co_await sstables_registry.update_entry_status(owner(), sst.generation(), status_removing);
co_await coroutine::parallel_for_each(sst._recognized_components, [this, &sst] (auto type) -> future<> {
co_await delete_object(make_object_name(sst, type));
});
co_await sstables_registry.delete_entry(owner(), sst.generation());
}
future<atomic_delete_context> object_storage_base::atomic_delete_prepare(const std::vector<shared_sstable>&) const {
// FIXME -- need atomicity, see #13567
co_return atomic_delete_context{};
}
future<> object_storage_base::atomic_delete_complete(atomic_delete_context ctx) const {
co_return;
}
future<> object_storage_base::remove_by_registry_entry(entry_descriptor desc) {
std::vector<sstring> components;
try {
auto f = make_readable_file(object_name(_bucket, desc.generation, sstable_version_constants::get_component_map(desc.version).at(component_type::TOC)));
components = co_await with_closeable(std::move(f), [] (file& f) {
return sstable::read_and_parse_toc(f);
});
} catch (const storage_io_error& e) {
if (e.code().value() != ENOENT) {
throw;
}
}
co_await coroutine::parallel_for_each(components, [this, &desc] (sstring comp) -> future<> {
if (comp != sstable_version_constants::TOC_SUFFIX) {
co_await delete_object(object_name(_bucket, desc.generation, comp));
}
});
co_await delete_object(object_name(_bucket, desc.generation, sstable_version_constants::TOC_SUFFIX));
}
future<> object_storage_base::unlink_component(const sstable& sst, component_type type) noexcept {
auto name = make_object_name(sst, type);
try {
co_await _client->delete_object(name);
} catch (...) {
sstlog.warn("Failed to delete {}: {}. Ignoring.", name, std::current_exception());
}
}
future<> object_storage_base::snapshot(const sstable& sst, sstring dir, absolute_path abs, std::optional<generation_type> gen) const {
on_internal_error(sstlog, "Snapshotting S3 objects not implemented");
co_return;
}
std::unique_ptr<sstables::storage> make_storage(sstables_manager& manager, const data_dictionary::storage_options& s_opts, sstable_state state) {
return std::visit(overloaded_functor {
[state] (const data_dictionary::storage_options::local& loc) mutable -> std::unique_ptr<sstables::storage> {
if (loc.dir.empty()) {
on_internal_error(sstlog, "Local storage options is missing 'dir'");
}
return std::make_unique<sstables::filesystem_storage>(loc.dir.native(), state);
},
[&] (const data_dictionary::storage_options::object_storage& os) mutable -> std::unique_ptr<sstables::storage> {
if (std::visit(overloaded_functor {
[] (const sstring& prefix) { return prefix.empty(); },
[] (const table_id& owner) { return owner.id.is_null(); }
}, os.location)) {
on_internal_error(sstlog, fmt::format("{} storage options is missing 'location'", os.name()));
}
if (s_opts.is_s3_type()) {
return std::make_unique<sstables::s3_storage>(manager.get_endpoint_client(os.endpoint), os.bucket, os.location, os.abort_source);
}
if (s_opts.is_gs_type()) {
return std::make_unique<sstables::object_storage_base>("GS", manager.get_endpoint_client(os.endpoint), os.bucket, os.location, os.abort_source);
}
throw std::runtime_error(fmt::format("Not implemented: '{}'", os.type));
}
}, s_opts.value);
}
static future<lw_shared_ptr<const data_dictionary::storage_options>> init_table_storage(const sstables_manager& mgr, const schema& s, const data_dictionary::storage_options::local& so) {
std::vector<sstring> dirs;
for (const auto& dd : mgr.get_config().data_file_directories) {
auto uuid_sstring = s.id().to_sstring();
boost::erase_all(uuid_sstring, "-");
auto dir = format("{}/{}/{}-{}", dd, s.ks_name(), s.cf_name(), uuid_sstring);
dirs.emplace_back(std::move(dir));
}
co_await coroutine::parallel_for_each(dirs, [] (sstring dir) -> future<> {
co_await io_check([&dir] { return recursive_touch_directory(dir); });
co_await io_check([&dir] { return touch_directory(dir + "/upload"); });
co_await io_check([&dir] { return touch_directory(dir + "/staging"); });
});
data_dictionary::storage_options nopts;
nopts.value = data_dictionary::storage_options::local {
.dir = fs::path(std::move(dirs[0])),
};
co_return make_lw_shared<const data_dictionary::storage_options>(std::move(nopts));
}
std::vector<std::filesystem::path> get_local_directories(const std::vector<sstring>& data_file_directories, const data_dictionary::storage_options::local& so) {
// see how this path is formatted by init_table_storage() above
auto table_dir = so.dir.parent_path().filename() / so.dir.filename();
return data_file_directories
| std::views::transform([&table_dir] (const auto& datadir) {
return std::filesystem::path(datadir) / table_dir;
})
| std::ranges::to<std::vector<std::filesystem::path>>();
}
static future<lw_shared_ptr<const data_dictionary::storage_options>> init_table_storage(const sstables_manager& mgr, const schema& s, const data_dictionary::storage_options::object_storage& so) {
data_dictionary::storage_options nopts;
nopts.value = data_dictionary::storage_options::object_storage {
.bucket = so.bucket,
.endpoint = so.endpoint,
.location = s.id(),
.type = so.type
};
co_return make_lw_shared<const data_dictionary::storage_options>(std::move(nopts));
}
future<lw_shared_ptr<const data_dictionary::storage_options>> init_table_storage(const sstables_manager& mgr, const schema& s, const data_dictionary::storage_options& so) {
co_return co_await std::visit([&mgr, &s] (const auto& so) { return init_table_storage(mgr, s, so); }, so.value);
}
future<> init_keyspace_storage(const sstables_manager& mgr, const data_dictionary::storage_options& so, sstring ks_name) {
co_await std::visit(overloaded_functor {
[&mgr, &ks_name] (const data_dictionary::storage_options::local&) -> future<> {
const auto& data_dirs = mgr.get_config().data_file_directories;
if (data_dirs.size() > 0) {
auto dir = format("{}/{}", data_dirs[0], ks_name);
co_await io_check([&dir] { return touch_directory(dir); });
}
},
[] (const data_dictionary::storage_options::object_storage&) -> future<> {
co_return;
}
}, so.value);
}
future<> destroy_table_storage(const data_dictionary::storage_options& so) {
co_await std::visit(overloaded_functor {
[] (const data_dictionary::storage_options::local& so) -> future<> {
if (so.dir.empty()) {
on_internal_error(sstlog, "Non-table local storage options");
}
co_await sstables::remove_table_directory_if_has_no_snapshots(so.dir);
},
[] (const data_dictionary::storage_options::object_storage&) -> future<> {
co_return;
}
}, so.value);
}
} // namespace sstables