Files
scylladb/sstables/storage.cc
Ernest Zaslavsky 51285785fa storage: add method to create component source
Extend the storage interface with a public method to create a
`data_source` for any sstable component.
2026-01-21 16:40:12 +02:00

956 lines
44 KiB
C++

/*
* Copyright (C) 2015-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include "storage.hh"
#include <cerrno>
#include <boost/algorithm/string.hpp>
#include <boost/algorithm/string/erase.hpp>
#include <exception>
#include <stdexcept>
#include <fmt/std.h>
#include <seastar/core/when_all.hh>
#include <seastar/coroutine/exception.hh>
#include <seastar/coroutine/parallel_for_each.hh>
#include <seastar/util/file.hh>
#include <seastar/util/closeable.hh>
#include "db/config.hh"
#include "db/extensions.hh"
#include "sstables/exceptions.hh"
#include "sstables/object_storage_client.hh"
#include "sstables/sstable_directory.hh"
#include "sstables/sstables_manager.hh"
#include "sstables/sstable_version.hh"
#include "sstables/integrity_checked_file_impl.hh"
#include "sstables/writer.hh"
#include "utils/assert.hh"
#include "utils/lister.hh"
#include "utils/overloaded_functor.hh"
#include "utils/memory_data_sink.hh"
#include "utils/s3/client.hh"
#include "utils/exceptions.hh"
#include "utils/to_string.hh"
#include "utils/checked-file-impl.hh"
#include "utils/io-wrappers.hh"
namespace sstables {
// cannot define these classes in an anonymous namespace, as we need to
// declare these storage classes as "friend" of class sstable
class filesystem_storage final : public sstables::storage {
mutable opened_directory _base_dir;
mutable opened_directory _dir;
std::optional<std::filesystem::path> _temp_dir; // Valid while the sstable is being created, until sealed
private:
enum class link_mode {
default_mode,
mark_for_removal,
leave_unsealed,
};
template <typename Comp>
requires std::is_same_v<Comp, component_type> || std::is_same_v<Comp, sstring>
static auto filename(const sstable& sst, sstring dir, generation_type gen, Comp comp) {
return sstable::filename(dir, sst._schema->ks_name(), sst._schema->cf_name(), sst._version, gen, sst._format, comp);
}
future<> check_create_links_replay(const sstable& sst, const sstring& dst_dir, generation_type dst_gen, const std::vector<std::pair<sstables::component_type, sstring>>& comps) const;
future<> remove_temp_dir();
virtual future<> create_links(const sstable& sst, const std::filesystem::path& dir) const override;
future<> create_links_common(const sstable& sst, sstring dst_dir, generation_type dst_gen, link_mode mode) const;
future<> touch_temp_dir(const sstable& sst);
future<> move(const sstable& sst, sstring new_dir, generation_type generation, delayed_commit_changes* delay) override;
future<> rename_new_file(const sstable& sst, sstring from_name, sstring to_name) const;
future<> change_dir(sstring new_dir) {
auto old_dir = std::exchange(_dir, opened_directory(new_dir));
return old_dir.close();
}
virtual future<> change_dir_for_test(sstring nd) override {
return change_dir(nd);
}
public:
explicit filesystem_storage(sstring dir, sstable_state state)
: _base_dir(dir)
, _dir(make_path(dir, state))
{}
virtual future<> seal(const sstable& sst) override;
virtual future<> snapshot(const sstable& sst, sstring name) const override;
virtual future<> clone(const sstable& sst, generation_type gen, bool leave_unsealed) const override;
virtual future<> change_state(const sstable& sst, sstable_state state, generation_type generation, delayed_commit_changes* delay) override;
// runs in async context
virtual void open(sstable& sst) override;
virtual future<> wipe(const sstable& sst, sync_dir) noexcept override;
virtual future<file> open_component(const sstable& sst, component_type type, open_flags flags, file_open_options options, bool check_integrity) override;
virtual future<data_sink> make_data_or_index_sink(sstable& sst, component_type type) override;
future<data_source> make_data_or_index_source(sstable& sst, component_type type, file f, uint64_t offset, uint64_t len, file_input_stream_options opt) const override;
future<data_source> make_source(sstable& sst, component_type type, file f, uint64_t offset, uint64_t len, file_input_stream_options opt) const override;
virtual future<data_sink> make_component_sink(sstable& sst, component_type type, open_flags oflags, file_output_stream_options options) override;
virtual future<> destroy(const sstable& sst) override { return make_ready_future<>(); }
virtual future<atomic_delete_context> atomic_delete_prepare(const std::vector<shared_sstable>&) const override;
virtual future<> atomic_delete_complete(atomic_delete_context ctx) const override;
virtual future<> remove_by_registry_entry(entry_descriptor desc) override;
virtual future<uint64_t> free_space() const override {
return seastar::fs_avail(prefix());
}
virtual future<> unlink_component(const sstable& sst, component_type) noexcept override;
virtual sstring prefix() const override { return _dir.native(); }
};
future<data_sink> filesystem_storage::make_data_or_index_sink(sstable& sst, component_type type) {
file_output_stream_options options;
options.buffer_size = sst.sstable_buffer_size;
options.write_behind = 10;
SCYLLA_ASSERT(
type == component_type::Data
|| type == component_type::Index
|| type == component_type::Rows
|| type == component_type::Partitions);
switch (type) {
case component_type::Data:
return make_file_data_sink(std::move(sst._data_file), options);
case component_type::Index:
return make_file_data_sink(std::move(sst._index_file), options);
case component_type::Rows:
return make_file_data_sink(std::move(sst._rows_file), options);
case component_type::Partitions:
return make_file_data_sink(std::move(sst._partitions_file), options);
default:
abort();
}
}
future<data_source> filesystem_storage::make_data_or_index_source(sstable& sst, component_type type, file f, uint64_t offset, uint64_t len, file_input_stream_options opt) const {
SCYLLA_ASSERT(type == component_type::Data || type == component_type::Index);
co_return co_await make_source(sst, type, std::move(f), offset, len, std::move(opt));
}
future<data_source> filesystem_storage::make_source(sstable&, component_type type, file f, uint64_t offset, uint64_t len, file_input_stream_options opt) const {
co_return make_file_data_source(std::move(f), offset, len, std::move(opt));
}
future<data_sink> filesystem_storage::make_component_sink(sstable& sst, component_type type, open_flags oflags, file_output_stream_options options) {
return sst.new_sstable_component_file(sst._write_error_handler, type, oflags).then([options = std::move(options)] (file f) mutable {
return make_file_data_sink(std::move(f), std::move(options));
});
}
static future<file> open_sstable_component_file_non_checked(std::string_view name, open_flags flags, file_open_options options,
bool check_integrity) noexcept {
if (flags != open_flags::ro && check_integrity) {
return open_integrity_checked_file_dma(name, flags, options);
}
return open_file_dma(name, flags, options);
}
future<> filesystem_storage::rename_new_file(const sstable& sst, sstring from_name, sstring to_name) const {
return sst.sstable_write_io_check(rename_file, from_name, to_name).handle_exception([from_name, to_name] (std::exception_ptr ep) {
sstlog.error("Could not rename SSTable component {} to {}. Found exception: {}", from_name, to_name, ep);
return make_exception_future<>(ep);
});
}
static future<file> maybe_wrap_file(const sstable& sst, component_type type, open_flags flags, future<file> f) {
if (type != component_type::TOC && type != component_type::TemporaryTOC) {
for (auto * ext : sst.manager().file_io_extensions()) {
f = with_file_close_on_failure(std::move(f), [ext, &sst, type, flags] (file f) {
return ext->wrap_file(sst, type, f, flags).then([f](file nf) mutable {
return nf ? nf : std::move(f);
});
});
}
}
return f;
}
static future<file> maybe_wrap_file(const sstable& sst, component_type type, open_flags flags, file f) {
return maybe_wrap_file(sst, type, flags, make_ready_future<file>(std::move(f)));
}
future<file> filesystem_storage::open_component(const sstable& sst, component_type type, open_flags flags, file_open_options options, bool check_integrity) {
auto create_flags = open_flags::create | open_flags::exclusive;
auto readonly = (flags & create_flags) != create_flags;
auto tgt_dir = !readonly && _temp_dir ? *_temp_dir : _dir.path();
auto name = tgt_dir / sst.component_basename(type);
auto f = open_sstable_component_file_non_checked(name.native(), flags, options, check_integrity);
if (!readonly) {
f = with_file_close_on_failure(std::move(f), [this, &sst, type, name = std::move(name)] (file fd) mutable {
return rename_new_file(sst, name.native(), fmt::to_string(sst.filename(type))).then([fd = std::move(fd)] () mutable {
return make_ready_future<file>(std::move(fd));
});
});
}
return maybe_wrap_file(sst, type, flags, std::move(f));
}
void filesystem_storage::open(sstable& sst) {
touch_temp_dir(sst).get();
// Writing TOC content to temporary file.
// If creation of temporary TOC failed, it implies that that boot failed to
// delete a sstable with temporary for this column family, or there is a
// sstable being created in parallel with the same generation.
file_output_stream_options options;
options.buffer_size = 4096;
auto sink = make_component_sink(sst, component_type::TemporaryTOC,
open_flags::wo |
open_flags::create |
open_flags::exclusive,
options).get();
auto w = file_writer(output_stream<char>(std::move(sink)), component_name(sst, component_type::TemporaryTOC));
bool toc_exists = file_exists(fmt::to_string(sst.filename(component_type::TOC))).get();
if (toc_exists) {
// TOC will exist at this point if write_components() was called with
// the generation of a sstable that exists.
w.close();
remove_file(fmt::to_string(sst.filename(component_type::TemporaryTOC))).get();
throw std::runtime_error(format("SSTable write failed due to existence of TOC file for generation {} of {}.{}", sst._generation, sst._schema->ks_name(), sst._schema->cf_name()));
}
sst.write_toc(std::move(w));
// Flushing parent directory to guarantee that temporary TOC file reached
// the disk.
_dir.sync(sst._write_error_handler).get();
}
future<> filesystem_storage::seal(const sstable& sst) {
// SSTable sealing is about renaming temporary TOC file after guaranteeing
// that each component reached the disk safely.
co_await remove_temp_dir();
// Guarantee that every component of this sstable reached the disk.
co_await _dir.sync(sst._write_error_handler);
// Rename TOC because it's no longer temporary.
co_await sst.sstable_write_io_check(rename_file, fmt::to_string(sst.filename(component_type::TemporaryTOC)), fmt::to_string(sst.filename(component_type::TOC)));
co_await _dir.sync(sst._write_error_handler);
// If this point was reached, sstable should be safe in disk.
sstlog.debug("SSTable with generation {} of {}.{} was sealed successfully.", sst._generation, sst._schema->ks_name(), sst._schema->cf_name());
}
future<> filesystem_storage::touch_temp_dir(const sstable& sst) {
if (_temp_dir) {
co_return;
}
auto tmp = _dir.path() / fmt::format("{}{}", sst._generation, tempdir_extension);
sstlog.debug("Touching temp_dir={}", tmp);
co_await sst.sstable_touch_directory_io_check(tmp);
_temp_dir = std::move(tmp);
}
future<> filesystem_storage::remove_temp_dir() {
if (!_temp_dir) {
co_return;
}
sstlog.debug("Removing temp_dir={}", _temp_dir);
try {
co_await remove_file(_temp_dir->native());
} catch (...) {
sstlog.error("Could not remove temporary directory: {}", std::current_exception());
throw;
}
_temp_dir.reset();
}
static bool is_same_file(const seastar::stat_data& sd1, const seastar::stat_data& sd2) noexcept {
return sd1.device_id == sd2.device_id && sd1.inode_number == sd2.inode_number;
}
static future<bool> same_file(sstring path1, sstring path2) noexcept {
return when_all_succeed(file_stat(std::move(path1)), file_stat(std::move(path2))).then_unpack([] (seastar::stat_data sd1, seastar::stat_data sd2) {
return is_same_file(sd1, sd2);
});
}
// support replay of link by considering link_file EEXIST error as successful when the newpath is hard linked to oldpath.
future<> idempotent_link_file(sstring oldpath, sstring newpath) noexcept {
bool exists = false;
std::exception_ptr ex;
try {
co_await link_file(oldpath, newpath);
} catch (const std::system_error& e) {
ex = std::current_exception();
exists = (e.code().value() == EEXIST);
} catch (...) {
ex = std::current_exception();
}
if (!ex) {
co_return;
}
if (exists && (co_await same_file(oldpath, newpath))) {
co_return;
}
co_await coroutine::return_exception_ptr(std::move(ex));
}
// Check is the operation is replayed, possibly when moving sstables
// from staging to the base dir, for example, right after create_links completes,
// and right before deleting the source links.
// We end up in two valid sstables in this case, so make create_links idempotent.
future<> filesystem_storage::check_create_links_replay(const sstable& sst, const sstring& dst_dir, generation_type dst_gen,
const std::vector<std::pair<sstables::component_type, sstring>>& comps) const {
return parallel_for_each(comps, [this, &sst, &dst_dir, dst_gen] (const auto& p) mutable -> future<> {
auto comp = p.second;
auto src = filename(sst, _dir.native(), sst._generation, comp);
auto dst = filename(sst, dst_dir, dst_gen, comp);
if (co_await file_exists(dst)) {
future<bool> fut = co_await coroutine::as_future(same_file(src, dst));
if (fut.failed()) {
auto eptr = fut.get_exception();
sstlog.error("Error while linking SSTable: {} to {}: {}", src, dst, eptr);
co_await coroutine::return_exception_ptr(std::move(eptr));
}
auto same = fut.get();
if (!same) {
auto msg = format("Error while linking SSTable: {} to {}: File exists", src, dst);
sstlog.error("{}", msg);
co_await coroutine::return_exception(malformed_sstable_exception(msg));
}
}
});
}
/// create_links_common links all component files from the sstable directory to
/// the given destination directory, using the provided generation.
///
/// It first checks if this is a replay of a previous
/// create_links call, by testing if the destination names already
/// exist, and if so, if they point to the same inodes as the
/// source names. Otherwise, we return an error.
/// This is an indication that something went wrong.
///
/// Creating the links is done by:
/// First, linking the source TOC component to the destination TemporaryTOC,
/// to mark the destination for rollback, in case we crash mid-way.
/// Then, all components are linked.
///
/// Note that if scylla crashes at this point, the destination SSTable
/// will have both a TemporaryTOC file and a regular TOC file.
/// It should be deleted on restart, thus rolling the operation backwards.
///
/// Eventually, if \c mark_for_removal is unset, the destination
/// TemporaryTOC is removed, to "commit" the destination sstable;
///
/// Otherwise, if \c mark_for_removal is set, the TemporaryTOC at the destination
/// is moved to the source directory to mark the source sstable for removal,
/// thus atomically toggling crash recovery from roll-back to roll-forward.
///
/// Similar to the scenario described above, crashing at this point
/// would leave the source sstable marked for removal, possibly
/// having both a TemporaryTOC file and a regular TOC file, and
/// then the source sstable should be deleted on restart, rolling the
/// operation forward.
///
/// Note that idempotent versions of link_file and rename_file
/// are used. These versions handle EEXIST errors that may happen
/// when the respective operations are replayed.
///
/// \param sst - the sstable to work on
/// \param dst_dir - the destination directory.
/// \param generation - the generation of the destination sstable
/// \param mode - what will be done after all components were linked
/// mark_for_removal - mark the sstable for removal after linking it to the destination dst_dir
/// leave_unsealed - leaves the destination sstable unsealed
future<> filesystem_storage::create_links_common(const sstable& sst, sstring dst_dir, generation_type generation, link_mode mode) const {
// They're mutually exclusive, so we can assume only one is set.
bool mark_for_removal = mode == link_mode::mark_for_removal;
bool leave_unsealed = mode == link_mode::leave_unsealed;
sstlog.trace("create_links: {} -> {} generation={} mark_for_removal={}", sst.get_filename(), dst_dir, generation, mark_for_removal);
auto comps = sst.all_components();
co_await check_create_links_replay(sst, dst_dir, generation, comps);
// TemporaryTOC is always first, TOC is always last
auto dst = filename(sst, dst_dir, generation, component_type::TemporaryTOC);
co_await sst.sstable_write_io_check(idempotent_link_file, fmt::to_string(sst.filename(component_type::TOC)), std::move(dst));
auto dir = opened_directory(dst_dir);
co_await dir.sync(sst._write_error_handler);
co_await parallel_for_each(comps, [this, &sst, &dst_dir, generation, leave_unsealed] (auto p) {
// Skips the linking of TOC file if the destination will be left unsealed.
if (leave_unsealed && p.first == component_type::TOC) {
return make_ready_future<>();
}
auto src = filename(sst, _dir.native(), sst._generation, p.second);
auto dst = filename(sst, dst_dir, generation, p.second);
return sst.sstable_write_io_check(idempotent_link_file, std::move(src), std::move(dst));
});
co_await dir.sync(sst._write_error_handler);
auto dst_temp_toc = filename(sst, dst_dir, generation, component_type::TemporaryTOC);
if (mark_for_removal) {
// Now that the source sstable is linked to new_dir, mark the source links for
// deletion by leaving a TemporaryTOC file in the source directory.
auto src_temp_toc = filename(sst, _dir.native(), sst._generation, component_type::TemporaryTOC);
co_await sst.sstable_write_io_check(rename_file, std::move(dst_temp_toc), std::move(src_temp_toc));
co_await _dir.sync(sst._write_error_handler);
} else if (!leave_unsealed) {
// Now that the source sstable is linked to dir, remove
// the TemporaryTOC file at the destination.
// This is bypassed if destination will be left unsealed.
co_await sst.sstable_write_io_check(remove_file, std::move(dst_temp_toc));
}
co_await dir.sync(sst._write_error_handler);
co_await dir.close();
sstlog.trace("create_links: {} -> {} generation={}: done", sst.get_filename(), dst_dir, generation);
}
future<> filesystem_storage::create_links(const sstable& sst, const std::filesystem::path& dir) const {
return create_links_common(sst, dir.native(), sst._generation, link_mode::default_mode);
}
future<> filesystem_storage::snapshot(const sstable& sst, sstring name) const {
std::filesystem::path snapshot_dir = _base_dir.path() / name;
co_await sst.sstable_touch_directory_io_check(snapshot_dir);
co_await create_links_common(sst, snapshot_dir.native(), sst._generation, link_mode::default_mode);
}
future<> filesystem_storage::clone(const sstable& sst, generation_type gen, bool leave_unsealed) const {
co_await create_links_common(sst, _dir.path().native(), std::move(gen), leave_unsealed ? link_mode::leave_unsealed : link_mode::default_mode);
}
future<> filesystem_storage::move(const sstable& sst, sstring new_dir, generation_type new_generation, delayed_commit_changes* delay_commit) {
co_await touch_directory(new_dir);
sstring old_dir = _dir.native();
sstlog.debug("Moving {} old_generation={} to {} new_generation={} do_sync_dirs={}",
sst.get_filename(), sst._generation, new_dir, new_generation, delay_commit == nullptr);
co_await create_links_common(sst, new_dir, new_generation, link_mode::mark_for_removal);
co_await change_dir(new_dir);
generation_type old_generation = sst._generation;
co_await coroutine::parallel_for_each(sst.all_components(), [&sst, old_generation, old_dir] (auto p) {
return sst.sstable_write_io_check(remove_file, filename(sst, old_dir, old_generation, p.second));
});
auto temp_toc = sstable_version_constants::get_component_map(sst._version).at(component_type::TemporaryTOC);
co_await sst.sstable_write_io_check(remove_file, filename(sst, old_dir, old_generation, temp_toc));
if (delay_commit == nullptr) {
co_await when_all(sst.sstable_write_io_check(sync_directory, old_dir), _dir.sync(sst._write_error_handler)).discard_result();
} else {
delay_commit->_dirs.insert(old_dir);
delay_commit->_dirs.insert(new_dir);
}
}
future<> filesystem_storage::change_state(const sstable& sst, sstable_state state, generation_type new_generation, delayed_commit_changes* delay_commit) {
auto to = state_to_dir(state);
auto path = _dir.path();
auto current = path.filename().native();
// Moving between states means moving between basedir/state subdirectories.
// However, normal state maps to the basedir itself and thus there's no way
// to check if current is normal_dir. The best that can be done here is to
// check that it's not anything else
if (current == staging_dir || current == upload_dir || current == quarantine_dir) {
if (to == quarantine_dir && current != staging_dir) {
// Legacy exception -- quarantine from anything but staging
// moves to the current directory quarantine subdir
path = path / to;
} else {
path = path.parent_path() / to;
}
} else {
current = normal_dir;
path = path / to;
}
if (current == to) {
co_return; // Already there
}
sstlog.info("Moving sstable {} to {}", sst.get_filename(), path);
co_await move(sst, path.native(), std::move(new_generation), delay_commit);
}
static inline fs::path parent_path(const sstring& fname) {
return fs::canonical(fs::path(fname)).parent_path();
}
future<> filesystem_storage::wipe(const sstable& sst, sync_dir sync) noexcept {
// We must be able to generate toc_filename()
// in order to delete the sstable.
// Running out of memory here will terminate.
auto name = [&sst] () noexcept {
memory::scoped_critical_alloc_section _;
return fmt::to_string(sst.toc_filename());
}();
try {
auto new_toc_name = co_await make_toc_temporary(name, sync);
if (!new_toc_name.empty()) {
auto dir_name = parent_path(new_toc_name);
co_await coroutine::parallel_for_each(sst.all_components(), [&sst, &dir_name] (auto component) -> future<> {
if (component.first == component_type::TOC) {
// already renamed
co_return;
}
auto fname = filename(sst, dir_name.native(), sst._generation, component.second);
try {
co_await sst.sstable_write_io_check(remove_file, fname);
} catch (...) {
if (!is_system_error_errno(ENOENT)) {
throw;
}
sstlog.debug("Forgiving ENOENT when deleting file {}", fname);
}
});
if (sync) {
co_await sst.sstable_write_io_check(sync_directory, dir_name.native());
}
co_await sst.sstable_write_io_check(remove_file, new_toc_name);
}
} catch (...) {
// Log and ignore the failure since there is nothing much we can do about it at this point.
// a. Compaction will retry deleting the sstable in the next pass, and
// b. in the future sstables_manager is planned to handle sstables deletion.
// c. Eventually we may want to record these failures in a system table
// and notify the administrator about that for manual handling (rather than aborting).
sstlog.warn("Failed to delete {}: {}. Ignoring.", name, std::current_exception());
}
if (_temp_dir) {
try {
co_await recursive_remove_directory(*_temp_dir);
_temp_dir.reset();
} catch (...) {
sstlog.warn("Exception when deleting temporary sstable directory {}: {}", *_temp_dir, std::current_exception());
}
}
}
future<atomic_delete_context> filesystem_storage::atomic_delete_prepare(const std::vector<shared_sstable>& ssts) const {
atomic_delete_context res;
for (const auto& sst : ssts) {
auto prefix = sst->_storage->prefix();
res.prefixes.insert(prefix);
}
res.pending_delete_log = co_await sstable_directory::create_pending_deletion_log(_base_dir, ssts);
co_return std::move(res);
}
future<> filesystem_storage::atomic_delete_complete(atomic_delete_context ctx) const {
co_await coroutine::parallel_for_each(ctx.prefixes, [] (const auto& dir) -> future<> {
co_await sync_directory(dir);
});
// Once all sstables are deleted, the log file can be removed.
// Note: the log file will be removed also if unlink failed to remove
// any sstable and ignored the error.
const auto& log = ctx.pending_delete_log;
try {
co_await remove_file(log);
sstlog.debug("{} removed.", log);
} catch (...) {
sstlog.warn("Error removing {}: {}. Ignoring.", log, std::current_exception());
}
}
future<> filesystem_storage::remove_by_registry_entry(entry_descriptor desc) {
on_internal_error(sstlog, "Filesystem storage doesn't keep its entries in registry");
}
future<> filesystem_storage::unlink_component(const sstable& sst, component_type type) noexcept {
std::string name;
try {
name = fmt::to_string(sst.filename(type));
co_await sst.sstable_write_io_check(remove_file, name);
} catch (...) {
// Log and ignore the failure since there is nothing much we can do about it at this point.
// a. Compaction will retry deleting the sstable in the next pass, and
// b. in the future sstables_manager is planned to handle sstables deletion.
// c. Eventually we may want to record these failures in a system table
// and notify the administrator about that for manual handling (rather than aborting).
sstlog.warn("Failed to delete {}: {}. Ignoring.", name, std::current_exception());
}
}
class object_storage_base : public sstables::storage {
protected:
sstring _type;
shared_ptr<sstables::object_storage_client> _client;
sstring _bucket;
std::variant<sstring, table_id> _location;
seastar::abort_source* _as;
static constexpr auto status_creating = "creating";
static constexpr auto status_sealed = "sealed";
static constexpr auto status_removing = "removing";
object_name make_object_name(const sstable& sst, component_type type) const;
table_id owner() const {
if (std::holds_alternative<sstring>(_location)) {
on_internal_error(sstlog, format("Storage holds {} prefix, but registry owner is expected", std::get<sstring>(_location)));
}
return std::get<table_id>(_location);
}
seastar::abort_source* abort_source() const {
return _as;
}
public:
object_storage_base(sstring type, shared_ptr<sstables::object_storage_client> client, sstring bucket, std::variant<sstring, table_id> loc, seastar::abort_source* as)
: _type(type)
, _client(std::move(client))
, _bucket(std::move(bucket))
, _location(std::move(loc))
, _as(as)
{}
future<> seal(const sstable& sst) override;
future<> snapshot(const sstable& sst, sstring name) const override;
future<> clone(const sstable& sst, generation_type gen, bool leave_unsealed) const override;
future<> change_state(const sstable& sst, sstable_state state, generation_type generation, delayed_commit_changes* delay) override;
// runs in async context
void open(sstable& sst) override;
future<> wipe(const sstable& sst, sync_dir) noexcept override;
future<file> open_component(const sstable& sst, component_type type, open_flags flags, file_open_options options, bool check_integrity) override;
future<data_sink> make_data_or_index_sink(sstable& sst, component_type type) override;
future<data_source> make_data_or_index_source(sstable& sst, component_type type, file f, uint64_t offset, uint64_t len, file_input_stream_options opt) const override;
future<data_source> make_source(sstable& sst, component_type type, file, uint64_t offset, uint64_t len, file_input_stream_options) const override;
future<data_sink> make_component_sink(sstable& sst, component_type type, open_flags oflags, file_output_stream_options options) override;
future<> destroy(const sstable& sst) override {
return make_ready_future<>();
}
future<atomic_delete_context> atomic_delete_prepare(const std::vector<shared_sstable>&) const override;
future<> atomic_delete_complete(atomic_delete_context ctx) const override;
future<> remove_by_registry_entry(entry_descriptor desc) override;
future<uint64_t> free_space() const override {
// assumes infinite space on s3/gs (https://aws.amazon.com/s3/faqs/#How_much_data_can_I_store).
return make_ready_future<uint64_t>(std::numeric_limits<uint64_t>::max());
}
future<> unlink_component(const sstable& sst, component_type) noexcept override;
sstring prefix() const override {
return std::visit([] (const auto& v) { return fmt::to_string(v); }, _location);
}
future<> put_object(object_name name, ::memory_data_sink_buffers bufs) {
return _client->put_object(std::move(name), std::move(bufs), abort_source());
}
future<> delete_object(object_name name) {
return _client->delete_object(std::move(name));
}
file make_readable_file(object_name name) {
return _client->make_readable_file(std::move(name), abort_source());
}
data_sink make_data_upload_sink(object_name name, std::optional<unsigned> max_parts_per_piece) {
return _client->make_data_upload_sink(std::move(name), max_parts_per_piece, abort_source());
}
data_sink make_upload_sink(object_name name) {
return _client->make_upload_sink(std::move(name), abort_source());
}
};
class s3_storage : public object_storage_base {
public:
s3_storage(shared_ptr<sstables::object_storage_client> client, sstring bucket, std::variant<sstring, table_id> loc, seastar::abort_source* as)
: object_storage_base("S3", std::move(client), std::move(bucket), std::move(loc), as)
{}
future<data_source> make_data_or_index_source(sstable& sst, component_type type, file f, uint64_t offset, uint64_t len, file_input_stream_options opt) const override;
future<data_source> make_source(sstable& sst, component_type type, file f, uint64_t offset, uint64_t len, file_input_stream_options opt) const override;
};
object_name object_storage_base::make_object_name(const sstable& sst, component_type type) const {
if (!sst.generation().is_uuid_based()) {
throw std::runtime_error(fmt::format("'{}' STORAGE only works with uuid_sstable_identifier enabled", _type));
}
return std::visit(overloaded_functor {
[&] (const sstring& prefix) {
return object_name(_bucket, prefix, sst.component_basename(type));
},
[&] (const table_id& owner) {
return object_name(_bucket, sst.generation(), sstable_version_constants::get_component_map(sst.get_version()).at(type));
}
}, _location);
}
void object_storage_base::open(sstable& sst) {
entry_descriptor desc(sst._generation, sst._version, sst._format, component_type::TOC);
sst.manager().sstables_registry().create_entry(owner(), status_creating, sst._state, std::move(desc)).get();
memory_data_sink_buffers bufs;
sst.write_toc(
file_writer(
output_stream<char>(
data_sink(
std::make_unique<memory_data_sink>(bufs)
)
)
)
);
put_object(make_object_name(sst, component_type::TOC), std::move(bufs)).get();
}
future<file> object_storage_base::open_component(const sstable& sst, component_type type, open_flags flags, file_open_options options, bool check_integrity) {
return maybe_wrap_file(sst, type, flags, make_readable_file(make_object_name(sst, type)));
}
static future<data_sink> maybe_wrap_sink(const sstable& sst, component_type type, data_sink sink) {
if (type != component_type::TOC && type != component_type::TemporaryTOC) {
for (auto* ext : sst.manager().file_io_extensions()) {
std::exception_ptr p;
try {
sink = co_await ext->wrap_sink(sst, type, std::move(sink));
} catch (...) {
p = std::current_exception();
}
if (p) {
co_await sink.close();
std::rethrow_exception(std::move(p));
}
}
}
co_return sink;
}
static future<data_source> maybe_wrap_source(const sstable& sst, component_type type, data_source src, uint64_t offset, uint64_t len) {
if (type != component_type::TOC && type != component_type::TemporaryTOC) {
for (auto* ext : sst.manager().file_io_extensions()) {
std::exception_ptr p;
try {
src = co_await ext->wrap_source(sst, type, std::move(src));
} catch (...) {
p = std::current_exception();
}
if (p) {
std::rethrow_exception(std::move(p));
}
}
}
co_return create_ranged_source(std::move(src), offset, len);
}
future<data_sink> object_storage_base::make_data_or_index_sink(sstable& sst, component_type type) {
SCYLLA_ASSERT(
type == component_type::Data
|| type == component_type::Index
|| type == component_type::Rows
|| type == component_type::Partitions);
// FIXME: if we have file size upper bound upfront, it's better to use make_upload_sink() instead
return maybe_wrap_sink(sst, type, make_data_upload_sink(make_object_name(sst, type), std::nullopt));
}
future<data_source>
object_storage_base::make_data_or_index_source(sstable& sst, component_type type, file f, uint64_t offset, uint64_t len, file_input_stream_options options) const {
co_return co_await make_source(sst, type, f, offset, len, options);
}
future<data_source>
object_storage_base::make_source(sstable& sst, component_type type, file f, uint64_t offset, uint64_t len, file_input_stream_options) const {
co_return co_await maybe_wrap_source(sst, type, _client->make_download_source(make_object_name(sst, type), abort_source()), offset, len);
}
future<data_source>
s3_storage::make_data_or_index_source(sstable& sst, component_type type, file f, uint64_t offset, uint64_t len, file_input_stream_options options) const {
co_return co_await make_source(sst, type, std::move(f), offset, len, std::move(options));
}
future<data_source>
s3_storage::make_source(sstable& sst, component_type type, file f, uint64_t offset, uint64_t len, file_input_stream_options options) const {
if (offset == 0) {
co_return co_await object_storage_base::make_source(sst, type, std::move(f), offset, len, std::move(options));
}
co_return make_file_data_source(
co_await maybe_wrap_file(sst, type, open_flags::ro, _client->make_readable_file(make_object_name(sst, type), abort_source())), offset, len, std::move(options));
}
future<data_sink> object_storage_base::make_component_sink(sstable& sst, component_type type, open_flags oflags, file_output_stream_options options) {
return maybe_wrap_sink(sst, type, make_upload_sink(make_object_name(sst, type)));
}
future<> object_storage_base::seal(const sstable& sst) {
co_await sst.manager().sstables_registry().update_entry_status(owner(), sst.generation(), status_sealed);
}
future<> object_storage_base::change_state(const sstable& sst, sstable_state state, generation_type generation, delayed_commit_changes* delay) {
if (generation != sst._generation) {
// The 'generation' field is clustering key in system.sstables and cannot be
// changed. However, that's fine, state AND generation change means the sstable
// is moved from upload directory and this is another issue for S3 (#13018)
co_await coroutine::return_exception(std::runtime_error("Cannot change state and generation of an S3 object"));
}
co_await sst.manager().sstables_registry().update_entry_state(owner(), sst.generation(), state);
}
future<> object_storage_base::wipe(const sstable& sst, sync_dir) noexcept {
auto& sstables_registry = sst.manager().sstables_registry();
co_await sstables_registry.update_entry_status(owner(), sst.generation(), status_removing);
co_await coroutine::parallel_for_each(sst._recognized_components, [this, &sst] (auto type) -> future<> {
co_await delete_object(make_object_name(sst, type));
});
co_await sstables_registry.delete_entry(owner(), sst.generation());
}
future<atomic_delete_context> object_storage_base::atomic_delete_prepare(const std::vector<shared_sstable>&) const {
// FIXME -- need atomicity, see #13567
co_return atomic_delete_context{};
}
future<> object_storage_base::atomic_delete_complete(atomic_delete_context ctx) const {
co_return;
}
future<> object_storage_base::remove_by_registry_entry(entry_descriptor desc) {
std::vector<sstring> components;
try {
auto f = make_readable_file(object_name(_bucket, desc.generation, sstable_version_constants::get_component_map(desc.version).at(component_type::TOC)));
components = co_await with_closeable(std::move(f), [] (file& f) {
return sstable::read_and_parse_toc(f);
});
} catch (const storage_io_error& e) {
if (e.code().value() != ENOENT) {
throw;
}
}
co_await coroutine::parallel_for_each(components, [this, &desc] (sstring comp) -> future<> {
if (comp != sstable_version_constants::TOC_SUFFIX) {
co_await delete_object(object_name(_bucket, desc.generation, comp));
}
});
co_await delete_object(object_name(_bucket, desc.generation, sstable_version_constants::TOC_SUFFIX));
}
future<> object_storage_base::unlink_component(const sstable& sst, component_type type) noexcept {
auto name = make_object_name(sst, type);
try {
co_await _client->delete_object(name);
} catch (...) {
sstlog.warn("Failed to delete {}: {}. Ignoring.", name, std::current_exception());
}
}
future<> object_storage_base::snapshot(const sstable& sst, sstring name) const {
on_internal_error(sstlog, "Snapshotting S3 objects not implemented");
co_return;
}
future<> object_storage_base::clone(const sstable& sst, generation_type gen, bool leave_unsealed) const {
on_internal_error(sstlog, "Cloning S3 objects not implemented");
co_return;
}
std::unique_ptr<sstables::storage> make_storage(sstables_manager& manager, const data_dictionary::storage_options& s_opts, sstable_state state) {
return std::visit(overloaded_functor {
[state] (const data_dictionary::storage_options::local& loc) mutable -> std::unique_ptr<sstables::storage> {
if (loc.dir.empty()) {
on_internal_error(sstlog, "Local storage options is missing 'dir'");
}
return std::make_unique<sstables::filesystem_storage>(loc.dir.native(), state);
},
[&] (const data_dictionary::storage_options::object_storage& os) mutable -> std::unique_ptr<sstables::storage> {
if (std::visit(overloaded_functor {
[] (const sstring& prefix) { return prefix.empty(); },
[] (const table_id& owner) { return owner.id.is_null(); }
}, os.location)) {
on_internal_error(sstlog, fmt::format("{} storage options is missing 'location'", os.name()));
}
if (s_opts.is_s3_type()) {
return std::make_unique<sstables::s3_storage>(manager.get_endpoint_client(os.endpoint), os.bucket, os.location, os.abort_source);
}
if (s_opts.is_gs_type()) {
return std::make_unique<sstables::object_storage_base>("GS", manager.get_endpoint_client(os.endpoint), os.bucket, os.location, os.abort_source);
}
throw std::runtime_error(fmt::format("Not implemented: '{}'", os.type));
}
}, s_opts.value);
}
static future<lw_shared_ptr<const data_dictionary::storage_options>> init_table_storage(const sstables_manager& mgr, const schema& s, const data_dictionary::storage_options::local& so) {
std::vector<sstring> dirs;
for (const auto& dd : mgr.get_config().data_file_directories) {
auto uuid_sstring = s.id().to_sstring();
boost::erase_all(uuid_sstring, "-");
auto dir = format("{}/{}/{}-{}", dd, s.ks_name(), s.cf_name(), uuid_sstring);
dirs.emplace_back(std::move(dir));
}
co_await coroutine::parallel_for_each(dirs, [] (sstring dir) -> future<> {
co_await io_check([&dir] { return recursive_touch_directory(dir); });
co_await io_check([&dir] { return touch_directory(dir + "/upload"); });
co_await io_check([&dir] { return touch_directory(dir + "/staging"); });
});
data_dictionary::storage_options nopts;
nopts.value = data_dictionary::storage_options::local {
.dir = fs::path(std::move(dirs[0])),
};
co_return make_lw_shared<const data_dictionary::storage_options>(std::move(nopts));
}
std::vector<std::filesystem::path> get_local_directories(const std::vector<sstring>& data_file_directories, const data_dictionary::storage_options::local& so) {
// see how this path is formatted by init_table_storage() above
auto table_dir = so.dir.parent_path().filename() / so.dir.filename();
return data_file_directories
| std::views::transform([&table_dir] (const auto& datadir) {
return std::filesystem::path(datadir) / table_dir;
})
| std::ranges::to<std::vector<std::filesystem::path>>();
}
static future<lw_shared_ptr<const data_dictionary::storage_options>> init_table_storage(const sstables_manager& mgr, const schema& s, const data_dictionary::storage_options::object_storage& so) {
data_dictionary::storage_options nopts;
nopts.value = data_dictionary::storage_options::object_storage {
.bucket = so.bucket,
.endpoint = so.endpoint,
.location = s.id(),
.type = so.type
};
co_return make_lw_shared<const data_dictionary::storage_options>(std::move(nopts));
}
future<lw_shared_ptr<const data_dictionary::storage_options>> init_table_storage(const sstables_manager& mgr, const schema& s, const data_dictionary::storage_options& so) {
co_return co_await std::visit([&mgr, &s] (const auto& so) { return init_table_storage(mgr, s, so); }, so.value);
}
future<> init_keyspace_storage(const sstables_manager& mgr, const data_dictionary::storage_options& so, sstring ks_name) {
co_await std::visit(overloaded_functor {
[&mgr, &ks_name] (const data_dictionary::storage_options::local&) -> future<> {
const auto& data_dirs = mgr.get_config().data_file_directories;
if (data_dirs.size() > 0) {
auto dir = format("{}/{}", data_dirs[0], ks_name);
co_await io_check([&dir] { return touch_directory(dir); });
}
},
[] (const data_dictionary::storage_options::object_storage&) -> future<> {
co_return;
}
}, so.value);
}
future<> destroy_table_storage(const data_dictionary::storage_options& so) {
co_await std::visit(overloaded_functor {
[] (const data_dictionary::storage_options::local& so) -> future<> {
if (so.dir.empty()) {
on_internal_error(sstlog, "Non-table local storage options");
}
co_await sstables::remove_table_directory_if_has_no_snapshots(so.dir);
},
[] (const data_dictionary::storage_options::object_storage&) -> future<> {
co_return;
}
}, so.value);
}
} // namespace sstables