Merge 'Encapsulate filesystem access by sstable into filesystem_storage subsclass' from Pavel Emelyanov

This is to define the API sstable needs from underlying storage. When implementing object-storage backend it will need to implement those. The API looks like

        future<> snapshot(const sstable& sst, sstring dir, absolute_path abs) const;
        future<> quarantine(const sstable& sst, delayed_commit_changes* delay);
        future<> move(const sstable& sst, sstring new_dir, generation_type generation, delayed_commit_changes* delay);
        void open(sstable& sst, const io_priority_class& pc); // runs in async context
        future<> wipe(const sstable& sst) noexcept;

        future<file> open_component(const sstable& sst, component_type type, open_flags flags, file_open_options options, bool check_integrity);

It doesn't have "list" or alike, because it's not a method of an individual sstable, but rather the one from sstables_manager. It will come as separate PR.

Closes #12217

* github.com:scylladb/scylladb:
  sstable, storage: Mark dir/temp_dir private
  sstable: Remove get_dir() (well, almost)
  sstable: Add quarantine() method to storage
  sstable: Use absolute/relative path marking for snapshot()
  sstable: Remove temp_... stuff from sstable
  sstable: Move open_component() on storage
  sstable: Mark rename_new_sstable_component_file() const
  sstable: Print filename(type) on open-component error
  sstable: Reorganize new_sstable_component_file()
  sstable: Mark filename() private
  sstable: Introduce index_filename()
  tests: Disclosure private filename() calls
  sstable: Move wipe_storage() on storage
  sstable: Remove temp dir in wipe_storage()
  sstable: Move unlink parts into wipe_storage
  sstable: Remove get_temp_dir()
  sstable: Move write_toc() to storage
  sstable: Shuffle open_sstable()
  sstable: Move touch_temp_dir() to storage
  sstable: Move move() to storage
  sstable: Move create_links() to storage
  sstable: Move seal_sstable() to storage
  sstable: Tossing internals of seal_sstable()
  sstable: Move remove_temp_dir() to storage
  sstable: Move create_links_common() to storage
  sstable: Move check_create_links_replay() to storage
  sstable: Remove one of create_links() overloads
  sstable: Remove create_links_and_mark_for_removal()
  sstable: Indentation fix after prevuous patch
  sstable: Coroutinize create_links_common()
  sstable: Rename create_links_common()'s "dir" argument
  sstable: Make mark_for_removal bool_class
  sstable, table: Add sstable::snapshot() and use in table::take_snapshot
  sstable: Move _dir and _temp_dir on filesystem_storage
  sstable: Use sync_directory() method
  test, sstable: Use component_basename in test
  sstables: Move read_{digest|checksum} on sstable
This commit is contained in:
Avi Kivity
2022-12-18 17:29:35 +02:00
12 changed files with 286 additions and 253 deletions

View File

@@ -1501,7 +1501,7 @@ future<table::snapshot_file_set> table::take_snapshot(database& db, sstring json
table_names->insert(sstable->component_basename(sstables::component_type::Data));
return with_semaphore(db.get_sharded_sst_dir_semaphore().local()._sem, 1, [&jsondir, sstable] {
return io_check([sstable, &dir = jsondir] {
return sstable->create_links(dir);
return sstable->snapshot(dir);
});
});
});

View File

@@ -157,10 +157,10 @@ private:
public:
void verify_end_state() const {
if (this->_remain > 0) {
throw malformed_sstable_exception(fmt::format("index_consume_entry_context (state={}): parsing ended but there is unconsumed data", _state), _sst.filename(component_type::Index));
throw malformed_sstable_exception(fmt::format("index_consume_entry_context (state={}): parsing ended but there is unconsumed data", _state), _sst.index_filename());
}
if (_state != state::KEY_SIZE && _state != state::START) {
throw malformed_sstable_exception(fmt::format("index_consume_entry_context (state={}): cannot finish parsing current entry, no more data", _state), _sst.filename(component_type::Index));
throw malformed_sstable_exception(fmt::format("index_consume_entry_context (state={}): cannot finish parsing current entry, no more data", _state), _sst.index_filename());
}
}
@@ -308,7 +308,7 @@ inline file make_tracked_index_file(sstable& sst, reader_permit permit, tracing:
if (!trace_state) {
return f;
}
return tracing::make_traced_file(std::move(f), std::move(trace_state), format("{}:", sst.filename(component_type::Index)));
return tracing::make_traced_file(std::move(f), std::move(trace_state), format("{}:", sst.index_filename()));
}
inline
@@ -541,7 +541,7 @@ private:
bound.current_index_idx = 0;
bound.current_pi_idx = 0;
if (bound.current_list->empty()) {
throw malformed_sstable_exception(format("missing index entry for summary index {} (bound {})", summary_idx, fmt::ptr(&bound)), _sstable->filename(component_type::Index));
throw malformed_sstable_exception(format("missing index entry for summary index {} (bound {})", summary_idx, fmt::ptr(&bound)), _sstable->index_filename());
}
bound.data_file_position = bound.current_list->_entries[0]->position();
bound.element = indexable_element::partition;

View File

@@ -787,8 +787,7 @@ public:
// exactly what callers used to do anyway.
estimated_partitions = std::max(uint64_t(1), estimated_partitions);
_sst.generate_toc(_schema.get_compressor_params().get_compressor(), _schema.bloom_filter_fp_chance());
_sst.write_toc(_pc);
_sst.open_sstable(_pc);
_sst.create_data().get();
_compression_enabled = !_sst.has_component(component_type::CRC);
init_file_writers();

View File

@@ -488,11 +488,11 @@ future<> sstable_directory::delete_atomically(std::vector<shared_sstable> ssts)
gen_tracker.update(sst->generation());
if (sstdir.empty()) {
sstdir = sst->get_dir();
sstdir = sst->_storage.prefix();
} else {
// All sstables are assumed to be in the same column_family, hence
// sharing their base directory.
assert (sstdir == sst->get_dir());
assert (sstdir == sst->_storage.prefix());
}
}

View File

@@ -133,21 +133,35 @@ static future<file> open_sstable_component_file_non_checked(std::string_view nam
return open_file_dma(name, flags, options);
}
future<> sstable::rename_new_sstable_component_file(sstring from_name, sstring to_name) {
future<> sstable::rename_new_sstable_component_file(sstring from_name, sstring to_name) const {
return sstable_write_io_check(rename_file, from_name, to_name).handle_exception([from_name, to_name] (std::exception_ptr ep) {
sstlog.error("Could not rename SSTable component {} to {}. Found exception: {}", from_name, to_name, ep);
return make_exception_future<>(ep);
});
}
future<file> sstable::new_sstable_component_file(const io_error_handler& error_handler, component_type type, open_flags flags, file_open_options options) noexcept {
try {
future<file> sstable::filesystem_storage::open_component(const sstable& sst, component_type type, open_flags flags, file_open_options options, bool check_integrity) {
auto create_flags = open_flags::create | open_flags::exclusive;
auto readonly = (flags & create_flags) != create_flags;
auto name = !readonly && _temp_dir ? temp_filename(type) : filename(type);
auto tgt_dir = !readonly && temp_dir ? dir + "/" + sstable::sst_dir_basename(sst._generation) : dir;
auto name = sst.filename(tgt_dir, type);
auto f = open_sstable_component_file_non_checked(name, flags, options,
_manager.config().enable_sstable_data_integrity_check());
auto f = open_sstable_component_file_non_checked(name, flags, options, check_integrity);
if (!readonly) {
f = with_file_close_on_failure(std::move(f), [&sst, type, name = std::move(name)] (file fd) mutable {
return sst.rename_new_sstable_component_file(name, sst.filename(type)).then([fd = std::move(fd)] () mutable {
return make_ready_future<file>(std::move(fd));
});
});
}
return f;
}
future<file> sstable::new_sstable_component_file(const io_error_handler& error_handler, component_type type, open_flags flags, file_open_options options) noexcept {
try {
auto f = _storage.open_component(*this, type, flags, options, _manager.config().enable_sstable_data_integrity_check());
if (type != component_type::TOC && type != component_type::TemporaryTOC) {
for (auto * ext : _manager.config().extensions().sstable_file_io_extensions()) {
@@ -163,17 +177,10 @@ future<file> sstable::new_sstable_component_file(const io_error_handler& error_h
return make_checked_file(error_handler, std::move(f));
});
if (!readonly) {
f = with_file_close_on_failure(std::move(f).handle_exception([name] (auto ep) {
sstlog.error("Could not create SSTable component {}. Found exception: {}", name, ep);
return make_exception_future<file>(ep);
}), [this, type, name = std::move(name)] (file fd) mutable {
return rename_new_sstable_component_file(name, filename(type)).then([fd = std::move(fd)] () mutable {
return make_ready_future<file>(std::move(fd));
});
});
}
return f;
return f.handle_exception([this, type] (auto ep) {
sstlog.error("Could not create SSTable component {}. Found exception: {}", filename(type), ep);
return make_exception_future<file>(ep);
});
} catch (...) {
return current_exception_as_future<file>();
}
@@ -867,7 +874,7 @@ future<> sstable::read_toc() noexcept {
}
void sstable::generate_toc(compressor_ptr c, double filter_fp_chance) {
void sstable::generate_toc() {
// Creating table of components.
_recognized_components.insert(component_type::TOC);
_recognized_components.insert(component_type::Statistics);
@@ -875,10 +882,10 @@ void sstable::generate_toc(compressor_ptr c, double filter_fp_chance) {
_recognized_components.insert(component_type::Index);
_recognized_components.insert(component_type::Summary);
_recognized_components.insert(component_type::Data);
if (filter_fp_chance != 1.0) {
if (_schema->bloom_filter_fp_chance() != 1.0) {
_recognized_components.insert(component_type::Filter);
}
if (c == nullptr) {
if (_schema->get_compressor_params().get_compressor() == nullptr) {
_recognized_components.insert(component_type::CRC);
} else {
_recognized_components.insert(component_type::CompressionInfo);
@@ -926,9 +933,14 @@ future<file_writer> sstable::make_component_file_writer(component_type c, file_o
});
}
void sstable::write_toc(const io_priority_class& pc) {
touch_temp_dir().get0();
auto file_path = filename(component_type::TemporaryTOC);
void sstable::open_sstable(const io_priority_class& pc) {
generate_toc();
_storage.open(*this, pc);
}
void sstable::filesystem_storage::open(sstable& sst, const io_priority_class& pc) {
touch_temp_dir(sst).get0();
auto file_path = sst.filename(component_type::TemporaryTOC);
sstlog.debug("Writing TOC file {} ", file_path);
@@ -939,51 +951,44 @@ void sstable::write_toc(const io_priority_class& pc) {
file_output_stream_options options;
options.buffer_size = 4096;
options.io_priority_class = pc;
auto w = make_component_file_writer(component_type::TemporaryTOC, std::move(options)).get0();
auto w = sst.make_component_file_writer(component_type::TemporaryTOC, std::move(options)).get0();
bool toc_exists = file_exists(filename(component_type::TOC)).get0();
bool toc_exists = file_exists(sst.filename(component_type::TOC)).get0();
if (toc_exists) {
// TOC will exist at this point if write_components() was called with
// the generation of a sstable that exists.
w.close();
remove_file(file_path).get();
throw std::runtime_error(format("SSTable write failed due to existence of TOC file for generation {:d} of {}.{}", _generation, _schema->ks_name(), _schema->cf_name()));
throw std::runtime_error(format("SSTable write failed due to existence of TOC file for generation {:d} of {}.{}", sst._generation, sst._schema->ks_name(), sst._schema->cf_name()));
}
for (auto&& key : _recognized_components) {
for (auto&& key : sst._recognized_components) {
// new line character is appended to the end of each component name.
auto value = sstable_version_constants::get_component_map(_version).at(key) + "\n";
auto value = sstable_version_constants::get_component_map(sst._version).at(key) + "\n";
bytes b = bytes(reinterpret_cast<const bytes::value_type *>(value.c_str()), value.size());
write(_version, w, b);
write(sst._version, w, b);
}
w.flush();
w.close();
// Flushing parent directory to guarantee that temporary TOC file reached
// the disk.
file dir_f = open_checked_directory(_write_error_handler, _dir).get0();
sstable_write_io_check([&] {
dir_f.flush().get();
dir_f.close().get();
});
sst.sstable_write_io_check(sync_directory, dir).get();
}
future<> sstable::seal_sstable() {
future<> sstable::filesystem_storage::seal(const sstable& sst) {
// SSTable sealing is about renaming temporary TOC file after guaranteeing
// that each component reached the disk safely.
co_await remove_temp_dir();
auto dir_f = co_await open_checked_directory(_write_error_handler, _dir);
auto dir_f = co_await open_checked_directory(sst._write_error_handler, dir);
// Guarantee that every component of this sstable reached the disk.
co_await sstable_write_io_check([&] { return dir_f.flush(); });
co_await sst.sstable_write_io_check([&] { return dir_f.flush(); });
// Rename TOC because it's no longer temporary.
co_await sstable_write_io_check(rename_file, filename(component_type::TemporaryTOC), filename(component_type::TOC));
co_await sstable_write_io_check([&] { return dir_f.flush(); });
co_await sstable_write_io_check([&] { return dir_f.close(); });
if (_marked_for_deletion == mark_for_deletion::implicit) {
_marked_for_deletion = mark_for_deletion::none;
}
co_await sst.sstable_write_io_check(rename_file, sst.filename(component_type::TemporaryTOC), sst.filename(component_type::TOC));
co_await sst.sstable_write_io_check([&] { return dir_f.flush(); });
co_await sst.sstable_write_io_check([&] { return dir_f.close(); });
// If this point was reached, sstable should be safe in disk.
sstlog.debug("SSTable with generation {} of {}.{} was sealed successfully.", _generation, _schema->ks_name(), _schema->cf_name());
sstlog.debug("SSTable with generation {} of {}.{} was sealed successfully.", sst._generation, sst._schema->ks_name(), sst._schema->cf_name());
}
void sstable::write_crc(const checksum& c) {
@@ -1755,13 +1760,12 @@ bool sstable::may_contain_rows(const query::clustering_row_ranges& ranges) const
future<> sstable::seal_sstable(bool backup)
{
return seal_sstable().then([this, backup] {
return _storage.seal(*this).then([this, backup] {
if (_marked_for_deletion == mark_for_deletion::implicit) {
_marked_for_deletion = mark_for_deletion::none;
}
if (backup) {
auto dir = get_dir() + "/backups/";
auto fut = sstable_touch_directory_io_check(dir);
return fut.then([this, dir = std::move(dir)] () mutable {
return create_links(std::move(dir));
});
return _storage.snapshot(*this, "backups", filesystem_storage::absolute_path::no);
}
return make_ready_future<>();
});
@@ -1962,26 +1966,26 @@ void sstable::validate_originating_host_id() const {
}
}
future<> sstable::touch_temp_dir() {
if (_temp_dir) {
future<> sstable::filesystem_storage::touch_temp_dir(const sstable& sst) {
if (temp_dir) {
return make_ready_future<>();
}
auto temp_dir = get_temp_dir();
sstlog.debug("Touching temp_dir={}", temp_dir);
auto fut = sstable_touch_directory_io_check(temp_dir);
return fut.then([this, temp_dir = std::move(temp_dir)] () mutable {
_temp_dir = std::move(temp_dir);
auto tmp = dir + "/" + sstable::sst_dir_basename(sst._generation);
sstlog.debug("Touching temp_dir={}", tmp);
auto fut = sst.sstable_touch_directory_io_check(tmp);
return fut.then([this, tmp = std::move(tmp)] () mutable {
temp_dir = std::move(tmp);
});
}
future<> sstable::remove_temp_dir() {
if (!_temp_dir) {
future<> sstable::filesystem_storage::remove_temp_dir() {
if (!temp_dir) {
return make_ready_future<>();
}
sstlog.debug("Removing temp_dir={}", _temp_dir);
return remove_file(*_temp_dir).then_wrapped([this] (future<> f) {
sstlog.debug("Removing temp_dir={}", temp_dir);
return remove_file(*temp_dir).then_wrapped([this] (future<> f) {
if (!f.failed()) {
_temp_dir.reset();
temp_dir.reset();
return make_ready_future<>();
}
auto ep = f.get_exception();
@@ -2001,15 +2005,15 @@ std::vector<sstring> sstable::component_filenames() const {
}
bool sstable::requires_view_building() const {
return boost::algorithm::ends_with(_dir, staging_dir);
return boost::algorithm::ends_with(_storage.prefix(), staging_dir);
}
bool sstable::is_quarantined() const noexcept {
return boost::algorithm::ends_with(_dir, quarantine_dir);
return boost::algorithm::ends_with(_storage.prefix(), quarantine_dir);
}
bool sstable::is_uploaded() const noexcept {
return boost::algorithm::ends_with(_dir, upload_dir);
return boost::algorithm::ends_with(_storage.prefix(), upload_dir);
}
sstring sstable::component_basename(const sstring& ks, const sstring& cf, version_types version, generation_type generation,
@@ -2096,12 +2100,12 @@ future<> idempotent_link_file(sstring oldpath, sstring newpath) noexcept {
// from staging to the base dir, for example, right after create_links completes,
// and right before deleting the source links.
// We end up in two valid sstables in this case, so make create_links idempotent.
future<> sstable::check_create_links_replay(const sstring& dst_dir, generation_type dst_gen,
future<> sstable::filesystem_storage::check_create_links_replay(const sstable& sst, const sstring& dst_dir, generation_type dst_gen,
const std::vector<std::pair<sstables::component_type, sstring>>& comps) const {
return parallel_for_each(comps, [this, &dst_dir, dst_gen] (const auto& p) mutable {
return parallel_for_each(comps, [this, &sst, &dst_dir, dst_gen] (const auto& p) mutable {
auto comp = p.second;
auto src = sstable::filename(_dir, _schema->ks_name(), _schema->cf_name(), _version, _generation, _format, comp);
auto dst = sstable::filename(dst_dir, _schema->ks_name(), _schema->cf_name(), _version, dst_gen, _format, comp);
auto src = sstable::filename(dir, sst._schema->ks_name(), sst._schema->cf_name(), sst._version, sst._generation, sst._format, comp);
auto dst = sstable::filename(dst_dir, sst._schema->ks_name(), sst._schema->cf_name(), sst._version, dst_gen, sst._format, comp);
return do_with(std::move(src), std::move(dst), [this] (const sstring& src, const sstring& dst) mutable {
return file_exists(dst).then([&, this] (bool exists) mutable {
if (!exists) {
@@ -2117,7 +2121,7 @@ future<> sstable::check_create_links_replay(const sstring& dst_dir, generation_t
if (!same) {
auto msg = format("Error while linking SSTable: {} to {}: File exists", src, dst);
sstlog.error("{}", msg);
return make_exception_future<>(malformed_sstable_exception(msg, _dir));
return make_exception_future<>(malformed_sstable_exception(msg, dir));
}
return make_ready_future<>();
});
@@ -2161,90 +2165,100 @@ future<> sstable::check_create_links_replay(const sstring& dst_dir, generation_t
/// are used. These versions handle EEXIST errors that may happen
/// when the respective operations are replayed.
///
/// \param dir - the destination directory.
/// \param sst - the sstable to work on
/// \param dst_dir - the destination directory.
/// \param generation - the generation of the destination sstable
/// \param mark_for_removal - mark the sstable for removal after linking it to the destination dir
future<> sstable::create_links_common(const sstring& dir, generation_type generation, bool mark_for_removal) const {
sstlog.trace("create_links: {} -> {} generation={} mark_for_removal={}", get_filename(), dir, generation, mark_for_removal);
return do_with(dir, all_components(), [this, generation, mark_for_removal] (const sstring& dir, auto& comps) {
return check_create_links_replay(dir, generation, comps).then([this, &dir, generation, &comps, mark_for_removal] {
// TemporaryTOC is always first, TOC is always last
auto dst = sstable::filename(dir, _schema->ks_name(), _schema->cf_name(), _version, generation, _format, component_type::TemporaryTOC);
return sstable_write_io_check(idempotent_link_file, filename(component_type::TOC), std::move(dst)).then([this, &dir] {
return sstable_write_io_check(sync_directory, dir);
}).then([this, &dir, generation, &comps] {
return parallel_for_each(comps, [this, &dir, generation] (auto p) {
auto src = sstable::filename(_dir, _schema->ks_name(), _schema->cf_name(), _version, _generation, _format, p.second);
auto dst = sstable::filename(dir, _schema->ks_name(), _schema->cf_name(), _version, generation, _format, p.second);
return sstable_write_io_check(idempotent_link_file, std::move(src), std::move(dst));
});
}).then([this, &dir] {
return sstable_write_io_check(sync_directory, dir);
});
}).then([this, &dir, generation, mark_for_removal] {
auto dst_temp_toc = sstable::filename(dir, _schema->ks_name(), _schema->cf_name(), _version, generation, _format, component_type::TemporaryTOC);
if (mark_for_removal) {
// Now that the source sstable is linked to new_dir, mark the source links for
// deletion by leaving a TemporaryTOC file in the source directory.
auto src_temp_toc = sstable::filename(_dir, _schema->ks_name(), _schema->cf_name(), _version, _generation, _format, component_type::TemporaryTOC);
return sstable_write_io_check(rename_file, std::move(dst_temp_toc), std::move(src_temp_toc)).then([this] {
return sstable_write_io_check(sync_directory, _dir);
});
} else {
// Now that the source sstable is linked to dir, remove
// the TemporaryTOC file at the destination.
return sstable_write_io_check(remove_file, std::move(dst_temp_toc));
}
}).then([this, &dir] {
return sstable_write_io_check(sync_directory, dir);
}).then([this, &dir, generation] {
sstlog.trace("create_links: {} -> {} generation={}: done", get_filename(), dir, generation);
});
/// \param mark_for_removal - mark the sstable for removal after linking it to the destination dst_dir
future<> sstable::filesystem_storage::create_links_common(const sstable& sst, sstring dst_dir, generation_type generation, mark_for_removal mark_for_removal) const {
sstlog.trace("create_links: {} -> {} generation={} mark_for_removal={}", sst.get_filename(), dst_dir, generation, mark_for_removal);
auto comps = sst.all_components();
co_await check_create_links_replay(sst, dst_dir, generation, comps);
// TemporaryTOC is always first, TOC is always last
auto dst = sstable::filename(dst_dir, sst._schema->ks_name(), sst._schema->cf_name(), sst._version, generation, sst._format, component_type::TemporaryTOC);
co_await sst.sstable_write_io_check(idempotent_link_file, sst.filename(component_type::TOC), std::move(dst));
co_await sst.sstable_write_io_check(sync_directory, dst_dir);
co_await parallel_for_each(comps, [this, &sst, &dst_dir, generation] (auto p) {
auto src = sstable::filename(dir, sst._schema->ks_name(), sst._schema->cf_name(), sst._version, sst._generation, sst._format, p.second);
auto dst = sstable::filename(dst_dir, sst._schema->ks_name(), sst._schema->cf_name(), sst._version, generation, sst._format, p.second);
return sst.sstable_write_io_check(idempotent_link_file, std::move(src), std::move(dst));
});
co_await sst.sstable_write_io_check(sync_directory, dst_dir);
auto dst_temp_toc = sstable::filename(dst_dir, sst._schema->ks_name(), sst._schema->cf_name(), sst._version, generation, sst._format, component_type::TemporaryTOC);
if (mark_for_removal) {
// Now that the source sstable is linked to new_dir, mark the source links for
// deletion by leaving a TemporaryTOC file in the source directory.
auto src_temp_toc = sstable::filename(dir, sst._schema->ks_name(), sst._schema->cf_name(), sst._version, sst._generation, sst._format, component_type::TemporaryTOC);
co_await sst.sstable_write_io_check(rename_file, std::move(dst_temp_toc), std::move(src_temp_toc));
co_await sst.sstable_write_io_check(sync_directory, dir);
} else {
// Now that the source sstable is linked to dir, remove
// the TemporaryTOC file at the destination.
co_await sst.sstable_write_io_check(remove_file, std::move(dst_temp_toc));
}
co_await sst.sstable_write_io_check(sync_directory, dst_dir);
sstlog.trace("create_links: {} -> {} generation={}: done", sst.get_filename(), dst_dir, generation);
}
future<> sstable::create_links(const sstring& dir, generation_type generation) const {
return create_links_common(dir, generation, false /* mark_for_removal */);
future<> sstable::filesystem_storage::create_links(const sstable& sst, const sstring& dir) const {
return create_links_common(sst, dir, sst._generation, mark_for_removal::no);
}
future<> sstable::create_links_and_mark_for_removal(const sstring& dir, generation_type generation) const {
return create_links_common(dir, generation, true /* mark_for_removal */);
future<> sstable::filesystem_storage::snapshot(const sstable& sst, sstring dir, absolute_path abs) const {
if (!abs) {
dir = this->dir + "/" + dir + "/";
}
co_await sst.sstable_touch_directory_io_check(dir);
co_await create_links(sst, dir);
}
future<> sstable::move_to_new_dir(sstring new_dir, generation_type new_generation, delayed_commit_changes* delay_commit) {
sstring old_dir = get_dir();
future<> sstable::snapshot(const sstring& dir) const {
return _storage.snapshot(*this, dir, filesystem_storage::absolute_path::yes);
}
future<> sstable::filesystem_storage::move(const sstable& sst, sstring new_dir, generation_type new_generation, delayed_commit_changes* delay_commit) {
co_await touch_directory(new_dir);
sstring old_dir = dir;
sstlog.debug("Moving {} old_generation={} to {} new_generation={} do_sync_dirs={}",
get_filename(), _generation, new_dir, new_generation, delay_commit == nullptr);
co_await create_links_and_mark_for_removal(new_dir, new_generation);
_dir = new_dir;
generation_type old_generation = std::exchange(_generation, new_generation);
co_await coroutine::parallel_for_each(all_components(), [this, old_generation, old_dir] (auto p) {
return sstable_write_io_check(remove_file, sstable::filename(old_dir, _schema->ks_name(), _schema->cf_name(), _version, old_generation, _format, p.second));
sst.get_filename(), sst._generation, new_dir, new_generation, delay_commit == nullptr);
co_await create_links_common(sst, new_dir, new_generation, mark_for_removal::yes);
dir = new_dir;
generation_type old_generation = sst._generation;
co_await coroutine::parallel_for_each(sst.all_components(), [&sst, old_generation, old_dir] (auto p) {
return sst.sstable_write_io_check(remove_file, sstable::filename(old_dir, sst._schema->ks_name(), sst._schema->cf_name(), sst._version, old_generation, sst._format, p.second));
});
auto temp_toc = sstable_version_constants::get_component_map(_version).at(component_type::TemporaryTOC);
co_await sstable_write_io_check(remove_file, sstable::filename(old_dir, _schema->ks_name(), _schema->cf_name(), _version, old_generation, _format, temp_toc));
auto temp_toc = sstable_version_constants::get_component_map(sst._version).at(component_type::TemporaryTOC);
co_await sst.sstable_write_io_check(remove_file, sstable::filename(old_dir, sst._schema->ks_name(), sst._schema->cf_name(), sst._version, old_generation, sst._format, temp_toc));
if (delay_commit == nullptr) {
co_await when_all(sstable_write_io_check(sync_directory, old_dir), sstable_write_io_check(sync_directory, new_dir)).discard_result();
co_await when_all(sst.sstable_write_io_check(sync_directory, old_dir), sst.sstable_write_io_check(sync_directory, new_dir)).discard_result();
} else {
delay_commit->_dirs.insert(old_dir);
delay_commit->_dirs.insert(new_dir);
}
}
future<> sstable::move_to_quarantine(delayed_commit_changes* delay_commit) {
auto path = fs::path(_dir);
sstring basename = path.filename().native();
if (basename == quarantine_dir) {
co_return;
} else if (basename == staging_dir) {
future<> sstable::move_to_new_dir(sstring new_dir, generation_type new_generation, delayed_commit_changes* delay_commit) {
co_await _storage.move(*this, std::move(new_dir), new_generation, delay_commit);
_generation = std::move(new_generation);
}
future<> sstable::filesystem_storage::quarantine(const sstable& sst, delayed_commit_changes* delay_commit) {
auto path = fs::path(dir);
if (path.filename().native() == staging_dir) {
path = path.parent_path();
}
// Note: moving a sstable in a snapshot or in the uploads dir to quarantine
// will move it into a "quarantine" subdirectory of its current directory.
auto new_dir = (path / sstables::quarantine_dir).native();
sstlog.info("Moving SSTable {} to quarantine in {}", get_filename(), new_dir);
co_await touch_directory(new_dir);
co_await move_to_new_dir(std::move(new_dir), generation(), delay_commit);
sstlog.info("Moving SSTable {} to quarantine in {}", sst.get_filename(), new_dir);
co_await move(sst, std::move(new_dir), sst.generation(), delay_commit);
}
future<> sstable::move_to_quarantine(delayed_commit_changes* delay_commit) {
if (is_quarantined()) {
return make_ready_future<>();
}
return _storage.quarantine(*this, delay_commit);
}
future<> sstable::delayed_commit_changes::commit() {
@@ -2528,8 +2542,12 @@ static future<bool> do_validate_uncompressed(input_stream<char>& stream, const c
co_return valid;
}
static future<uint32_t> read_digest(sstring filename, file_input_stream_options options) {
auto digest_file = co_await open_file_dma(filename, open_flags::ro);
future<uint32_t> sstable::read_digest(io_priority_class pc) {
file_input_stream_options options;
options.buffer_size = 4096;
options.io_priority_class = pc;
auto digest_file = co_await open_file_dma(filename(component_type::Digest), open_flags::ro);
auto digest_stream = make_file_input_stream(std::move(digest_file), options);
std::exception_ptr ex;
@@ -2547,8 +2565,12 @@ static future<uint32_t> read_digest(sstring filename, file_input_stream_options
co_return boost::lexical_cast<uint32_t>(digest_str);
}
static future<checksum> read_checksum(sstring filename, file_input_stream_options options) {
auto crc_file = co_await open_file_dma(filename, open_flags::ro);
future<checksum> sstable::read_checksum(io_priority_class pc) {
file_input_stream_options options;
options.buffer_size = 4096;
options.io_priority_class = pc;
auto crc_file = co_await open_file_dma(filename(component_type::CRC), open_flags::ro);
auto crc_stream = make_file_input_stream(std::move(crc_file), options);
checksum checksum;
@@ -2578,11 +2600,7 @@ static future<checksum> read_checksum(sstring filename, file_input_stream_option
}
future<bool> validate_checksums(shared_sstable sst, reader_permit permit, const io_priority_class& pc) {
file_input_stream_options options;
options.buffer_size = 4096;
options.io_priority_class = pc;
const auto digest = co_await read_digest(sst->filename(component_type::Digest), options);
const auto digest = co_await sst->read_digest(pc);
auto data_stream = sst->data_stream(0, sst->ondisk_data_size(), pc, permit, nullptr, nullptr, sstable::raw_stream::yes);
@@ -2597,7 +2615,7 @@ future<bool> validate_checksums(shared_sstable sst, reader_permit permit, const
valid = co_await do_validate_compressed<adler32_utils>(data_stream, sst->get_compression(), false, digest);
}
} else {
auto checksum = co_await read_checksum(sst->filename(component_type::CRC), options);
auto checksum = co_await sst->read_checksum(pc);
if (sst->get_version() >= sstable_version_types::mc) {
valid = co_await do_validate_uncompressed<crc32_utils>(data_stream, checksum, digest);
} else {
@@ -2754,20 +2772,6 @@ future<> sstable::close_files() {
} catch (...) {
sstlog.warn("Exception when deleting sstable file: {}", std::current_exception());
}
if (_temp_dir) {
try {
unlinked_temp_dir = recursive_remove_directory(fs::path(*_temp_dir)).then_wrapped([this] (future<> f) {
if (f.failed()) {
sstlog.warn("Exception when deleting temporary sstable directory {}: {}", *_temp_dir, f.get_exception());
} else {
_temp_dir.reset();
}
});
} catch (...) {
sstlog.warn("Exception when deleting temporary sstable directory {}: {}", *_temp_dir, std::current_exception());
}
}
}
_on_closed(*this);
@@ -3006,37 +3010,47 @@ utils::hashed_key sstable::make_hashed_key(const schema& s, const partition_key&
return utils::make_hashed_key(static_cast<bytes_view>(key::from_partition_key(s, key)));
}
future<>
sstable::unlink() noexcept {
future<> sstable::filesystem_storage::wipe(const sstable& sst) noexcept {
// We must be able to generate toc_filename()
// in order to delete the sstable.
// Running out of memory here will terminate.
auto name = [this] () noexcept {
auto name = [&sst] () noexcept {
memory::scoped_critical_alloc_section _;
return toc_filename();
return sst.toc_filename();
}();
// remove_by_toc_name doesn't throw
auto fut = remove_by_toc_name(name);
// remove_fut never fails
auto remove_fut = fut.then_wrapped([&name] (future<> f) {
if (f.failed()) {
// Log and ignore the failure since there is nothing much we can do about it at this point.
// a. Compaction will retry deleting the sstable in the next pass, and
// b. in the future sstables_manager is planned to handle sstables deletion.
// c. Eventually we may want to record these failures in a system table
// and notify the administrator about that for manual handling (rather than aborting).
sstlog.warn("Failed to delete {}: {}. Ignoring.", name, f.get_exception());
try {
co_await remove_by_toc_name(name);
} catch (...) {
// Log and ignore the failure since there is nothing much we can do about it at this point.
// a. Compaction will retry deleting the sstable in the next pass, and
// b. in the future sstables_manager is planned to handle sstables deletion.
// c. Eventually we may want to record these failures in a system table
// and notify the administrator about that for manual handling (rather than aborting).
sstlog.warn("Failed to delete {}: {}. Ignoring.", name, std::current_exception());
}
if (temp_dir) {
try {
co_await recursive_remove_directory(fs::path(*temp_dir));
temp_dir.reset();
} catch (...) {
sstlog.warn("Exception when deleting temporary sstable directory {}: {}", *temp_dir, std::current_exception());
}
return make_ready_future<>();
});
}
}
future<>
sstable::unlink() noexcept {
auto remove_fut = _storage.wipe(*this);
try {
co_await get_large_data_handler().maybe_delete_large_data_entries(shared_from_this());
} catch (...) {
memory::scoped_critical_alloc_section _;
// Just log and ignore failures to delete large data entries.
// They are not critical to the operation of the database.
sstlog.warn("Failed to delete large data entry for {}: {}. Ignoring.", name, std::current_exception());
sstlog.warn("Failed to delete large data entry for {}: {}. Ignoring.", toc_filename(), std::current_exception());
}
co_await std::move(remove_fut);
@@ -3185,8 +3199,8 @@ sstable::sstable(schema_ptr schema,
size_t buffer_size)
: sstable_buffer_size(buffer_size)
, _schema(std::move(schema))
, _dir(std::move(dir))
, _generation(generation)
, _storage(std::move(dir))
, _version(v)
, _format(f)
, _index_cache(std::make_unique<partition_index_cache>(

View File

@@ -351,14 +351,6 @@ public:
return component_basename(_schema->ks_name(), _schema->cf_name(), _version, _generation, _format, f);
}
sstring filename(component_type f) const {
return filename(get_dir(), f);
}
sstring temp_filename(component_type f) const {
return filename(get_temp_dir(), f);
}
sstring get_filename() const {
return filename(component_type::Data);
}
@@ -367,12 +359,12 @@ public:
return filename(component_type::TOC);
}
static sstring sst_dir_basename(generation_type gen) {
return fmt::format("{}.sstable", gen);
sstring index_filename() const {
return filename(component_type::Index);
}
static sstring temp_sst_dir(const sstring& dir, generation_type gen) {
return dir + "/" + sst_dir_basename(gen);
static sstring sst_dir_basename(generation_type gen) {
return fmt::format("{}.sstable", gen);
}
static bool is_temp_dir(const fs::path& dirpath)
@@ -397,11 +389,7 @@ public:
std::vector<std::pair<component_type, sstring>> all_components() const;
future<> create_links(const sstring& dir, generation_type generation) const;
future<> create_links(const sstring& dir) const {
return create_links(dir, _generation);
}
future<> snapshot(const sstring& dir) const;
// Delete the sstable by unlinking all sstable files
// Ignores all errors.
@@ -473,19 +461,47 @@ public:
const position_in_partition& last_partition_last_position() const noexcept {
return _last_partition_last_position;
}
using mark_for_removal = bool_class<class mark_for_removal_tag>;
class filesystem_storage {
friend class test;
sstring dir;
std::optional<sstring> temp_dir; // Valid while the sstable is being created, until sealed
private:
future<> check_create_links_replay(const sstable& sst, const sstring& dst_dir, generation_type dst_gen, const std::vector<std::pair<sstables::component_type, sstring>>& comps) const;
future<> remove_temp_dir();
future<> create_links(const sstable& sst, const sstring& dir) const;
future<> create_links_common(const sstable& sst, sstring dst_dir, generation_type dst_gen, mark_for_removal mark_for_removal) const;
future<> touch_temp_dir(const sstable& sst);
public:
explicit filesystem_storage(sstring dir_) : dir(std::move(dir_)) {}
using absolute_path = bool_class<class absolute_path_tag>; // FIXME -- should go away eventually
future<> seal(const sstable& sst);
future<> snapshot(const sstable& sst, sstring dir, absolute_path abs) const;
future<> quarantine(const sstable& sst, delayed_commit_changes* delay);
future<> move(const sstable& sst, sstring new_dir, generation_type generation, delayed_commit_changes* delay);
// runs in async context
void open(sstable& sst, const io_priority_class& pc);
future<> wipe(const sstable& sst) noexcept;
future<file> open_component(const sstable& sst, component_type type, open_flags flags, file_open_options options, bool check_integrity);
sstring prefix() const { return dir; }
};
private:
sstring filename(component_type f) const {
return filename(_storage.prefix(), f);
}
sstring filename(const sstring& dir, component_type f) const {
return filename(dir, _schema->ks_name(), _schema->cf_name(), _version, _generation, _format, f);
}
friend class sstable_directory;
const sstring& get_dir() const {
return _dir;
}
const sstring get_temp_dir() const {
return temp_sst_dir(_dir, _generation);
}
size_t sstable_buffer_size;
@@ -523,10 +539,10 @@ private:
lw_shared_ptr<file_input_stream_history> _index_history = make_lw_shared<file_input_stream_history>();
schema_ptr _schema;
sstring _dir;
std::optional<sstring> _temp_dir; // Valid while the sstable is being created, until sealed
generation_type _generation{0};
filesystem_storage _storage;
version_types _version;
format_types _format;
@@ -577,18 +593,14 @@ private:
void write_crc(const checksum& c);
void write_digest(uint32_t full_checksum);
future<> rename_new_sstable_component_file(sstring from_file, sstring to_file);
future<> rename_new_sstable_component_file(sstring from_file, sstring to_file) const;
future<file> new_sstable_component_file(const io_error_handler& error_handler, component_type f, open_flags flags, file_open_options options = {}) noexcept;
future<file_writer> make_component_file_writer(component_type c, file_output_stream_options options,
open_flags oflags = open_flags::wo | open_flags::create | open_flags::exclusive) noexcept;
future<> touch_temp_dir();
future<> remove_temp_dir();
void generate_toc(compressor_ptr c, double filter_fp_chance);
void write_toc(const io_priority_class& pc);
future<> seal_sstable();
void generate_toc();
void open_sstable(const io_priority_class& pc);
future<> read_compression(const io_priority_class& pc);
void write_compression(const io_priority_class& pc);
@@ -696,10 +708,6 @@ private:
}
future<> open_or_create_data(open_flags oflags, file_open_options options = {}) noexcept;
future<> check_create_links_replay(const sstring& dst_dir, generation_type dst_gen, const std::vector<std::pair<sstables::component_type, sstring>>& comps) const;
future<> create_links_common(const sstring& dst_dir, generation_type dst_gen, bool mark_for_removal) const;
future<> create_links_and_mark_for_removal(const sstring& dst_dir, generation_type dst_gen) const;
public:
future<> read_toc() noexcept;
@@ -912,6 +920,9 @@ public:
friend void lw_shared_ptr_deleter<sstables::sstable>::dispose(sstable* s);
gc_clock::time_point get_gc_before_for_drop_estimation(const gc_clock::time_point& compaction_time, const tombstone_gc_state& gc_state) const;
gc_clock::time_point get_gc_before_for_fully_expire(const gc_clock::time_point& compaction_time, const tombstone_gc_state& gc_state) const;
future<uint32_t> read_digest(io_priority_class pc);
future<checksum> read_checksum(io_priority_class pc);
};
// Validate checksums

View File

@@ -307,12 +307,13 @@ SEASTAR_THREAD_TEST_CASE(test_distributed_loader_with_incomplete_sstables) {
require_exist(file_name, true);
};
auto temp_sst_dir = sst::temp_sst_dir(sst_dir, generation_from_value(2));
touch_dir(temp_sst_dir);
auto temp_sst_dir_2 = sst_dir + "/" + sst::sst_dir_basename(generation_from_value(2));
touch_dir(temp_sst_dir_2);
temp_sst_dir = sst::temp_sst_dir(sst_dir, generation_from_value(3));
touch_dir(temp_sst_dir);
auto temp_file_name = sst::filename(temp_sst_dir, ks, cf, sst::version_types::mc, generation_from_value(3), sst::format_types::big, component_type::TemporaryTOC);
auto temp_sst_dir_3 = sst_dir + "/" + sst::sst_dir_basename(generation_from_value(3));
touch_dir(temp_sst_dir_3);
auto temp_file_name = sst::filename(temp_sst_dir_3, ks, cf, sst::version_types::mc, generation_from_value(3), sst::format_types::big, component_type::TemporaryTOC);
touch_file(temp_file_name);
temp_file_name = sst::filename(sst_dir, ks, cf, sst::version_types::mc, generation_from_value(4), sst::format_types::big, component_type::TemporaryTOC);
@@ -320,9 +321,9 @@ SEASTAR_THREAD_TEST_CASE(test_distributed_loader_with_incomplete_sstables) {
temp_file_name = sst::filename(sst_dir, ks, cf, sst::version_types::mc, generation_from_value(4), sst::format_types::big, component_type::Data);
touch_file(temp_file_name);
do_with_cql_env_thread([&sst_dir, &ks, &cf, &require_exist] (cql_test_env& e) {
require_exist(sst::temp_sst_dir(sst_dir, generation_from_value(2)), false);
require_exist(sst::temp_sst_dir(sst_dir, generation_from_value(3)), false);
do_with_cql_env_thread([&sst_dir, &ks, &cf, &require_exist, &temp_sst_dir_2, &temp_sst_dir_3] (cql_test_env& e) {
require_exist(temp_sst_dir_2, false);
require_exist(temp_sst_dir_3, false);
require_exist(sst::filename(sst_dir, ks, cf, sst::version_types::mc, generation_from_value(4), sst::format_types::big, component_type::TemporaryTOC), false);
require_exist(sst::filename(sst_dir, ks, cf, sst::version_types::mc, generation_from_value(4), sst::format_types::big, component_type::Data), false);
@@ -1160,7 +1161,7 @@ SEASTAR_TEST_CASE(snapshot_with_quarantine_works) {
}
// collect all expected sstable data files
for (auto sst : sstables) {
expected.insert(fs::path(sst->get_filename()).filename().native());
expected.insert(sst->component_basename(sstables::component_type::Data));
}
if (std::exchange(found, true)) {
co_return;

View File

@@ -2095,7 +2095,7 @@ SEASTAR_TEST_CASE(test_unknown_component) {
auto tmp = tmpdir();
copy_directory("test/resource/sstables/unknown_component", std::string(tmp.path().string()) + "/unknown_component");
auto sstp = env.reusable_sst(uncompressed_schema(), tmp.path().string() + "/unknown_component", 1).get0();
sstp->create_links(tmp.path().string()).get();
test::create_links(*sstp, tmp.path().string()).get();
// check that create_links() moved unknown component to new dir
BOOST_REQUIRE(file_exists(tmp.path().string() + "/la-1-big-UNKNOWN.txt").get0());
@@ -3143,7 +3143,7 @@ SEASTAR_TEST_CASE(test_validate_checksums) {
valid = sstables::validate_checksums(sst, permit, default_priority_class()).get();
BOOST_REQUIRE(valid);
auto sst_file = open_file_dma(sst->get_filename(), open_flags::wo).get();
auto sst_file = open_file_dma(test::filename(*sst, sstables::component_type::Data).native(), open_flags::wo).get();
auto close_sst_file = defer([&sst_file] { sst_file.close().get(); });
testlog.info("Validating corrupted {}", sst->get_filename());
@@ -3188,7 +3188,7 @@ SEASTAR_TEST_CASE(partial_sstable_deletion_test) {
auto sst = make_sstable_containing(sst_gen, {std::move(mut1)});
// Rename TOC into TMP toc, to stress deletion path for partial files
rename_file(sst->toc_filename(), sst->filename(sstables::component_type::TemporaryTOC)).get();
rename_file(test::filename(*sst, sstables::component_type::TOC).native(), test::filename(*sst, sstables::component_type::TemporaryTOC).native()).get();
sst->unlink().get();
});

View File

@@ -183,7 +183,7 @@ SEASTAR_TEST_CASE(sstable_directory_test_table_scan_incomplete_sstables) {
// Now there is one sstable to the upload directory, but it is incomplete and one component is missing.
// We should fail validation and leave the directory untouched
remove_file(sst->filename(sstables::component_type::Statistics)).get();
remove_file(test::filename(*sst, sstables::component_type::Statistics).native()).get();
with_sstable_directory(dir.path(), 1,
sstable_from_existing_file(env),
@@ -220,7 +220,7 @@ SEASTAR_TEST_CASE(sstable_directory_test_table_temporary_toc) {
return sstables::test_env::do_with_async([] (test_env& env) {
auto dir = tmpdir();
auto sst = make_sstable_for_this_shard(std::bind(new_sstable, std::ref(env), dir.path(), 1));
rename_file(sst->filename(sstables::component_type::TOC), sst->filename(sstables::component_type::TemporaryTOC)).get();
rename_file(test::filename(*sst, sstables::component_type::TOC).native(), test::filename(*sst, sstables::component_type::TemporaryTOC).native()).get();
with_sstable_directory(dir.path(), 1,
sstable_from_existing_file(env),
@@ -236,7 +236,7 @@ SEASTAR_TEST_CASE(sstable_directory_test_table_extra_temporary_toc) {
return sstables::test_env::do_with_async([] (test_env& env) {
auto dir = tmpdir();
auto sst = make_sstable_for_this_shard(std::bind(new_sstable, std::ref(env), dir.path(), 1));
link_file(sst->filename(sstables::component_type::TOC), sst->filename(sstables::component_type::TemporaryTOC)).get();
link_file(test::filename(*sst, sstables::component_type::TOC).native(), test::filename(*sst, sstables::component_type::TemporaryTOC).native()).get();
with_sstable_directory(dir.path(), 1,
sstable_from_existing_file(env),
@@ -253,7 +253,7 @@ SEASTAR_TEST_CASE(sstable_directory_test_table_missing_toc) {
auto dir = tmpdir();
auto sst = make_sstable_for_this_shard(std::bind(new_sstable, std::ref(env), dir.path(), 1));
remove_file(sst->filename(sstables::component_type::TOC)).get();
remove_file(test::filename(*sst, sstables::component_type::TOC).native()).get();
with_sstable_directory(dir.path(), 1,
sstable_from_existing_file(env),
@@ -279,10 +279,10 @@ SEASTAR_THREAD_TEST_CASE(sstable_directory_test_temporary_statistics) {
auto dir = tmpdir();
auto sst = make_sstable_for_this_shard(std::bind(new_sstable, std::ref(env.local()), dir.path(), 1));
auto tempstr = sst->filename(component_type::TemporaryStatistics);
auto f = open_file_dma(tempstr, open_flags::rw | open_flags::create | open_flags::truncate).get0();
auto tempstr = test::filename(*sst, component_type::TemporaryStatistics);
auto f = open_file_dma(tempstr.native(), open_flags::rw | open_flags::create | open_flags::truncate).get0();
f.close().get();
auto tempstat = fs::canonical(fs::path(tempstr));
auto tempstat = fs::canonical(tempstr);
with_sstable_directory(dir.path(), 1,
sstable_from_existing_file(env),
@@ -295,7 +295,7 @@ SEASTAR_THREAD_TEST_CASE(sstable_directory_test_temporary_statistics) {
}).get();
});
remove_file(sst->filename(sstables::component_type::Statistics)).get();
remove_file(test::filename(*sst, sstables::component_type::Statistics).native()).get();
with_sstable_directory(dir.path(), 1,
sstable_from_existing_file(env),
@@ -312,7 +312,7 @@ SEASTAR_THREAD_TEST_CASE(sstable_directory_test_generation_sanity) {
auto dir = tmpdir();
make_sstable_for_this_shard(std::bind(new_sstable, std::ref(env.local()), dir.path(), 3333));
auto sst = make_sstable_for_this_shard(std::bind(new_sstable, std::ref(env.local()), dir.path(), 6666));
rename_file(sst->filename(sstables::component_type::TOC), sst->filename(sstables::component_type::TemporaryTOC)).get();
rename_file(test::filename(*sst, sstables::component_type::TOC).native(), test::filename(*sst, sstables::component_type::TemporaryTOC).native()).get();
with_sstable_directory(dir.path(), 1,
sstable_from_existing_file(env),
@@ -415,7 +415,7 @@ SEASTAR_TEST_CASE(sstable_directory_test_table_lock_works) {
// Collect all sstable file names
sstdir.invoke_on_all([&] (sstable_directory& d) {
return d.do_for_each_sstable([&] (sstables::shared_sstable sst) {
sstables[this_shard_id()].push_back(sst->get_filename());
sstables[this_shard_id()].push_back(test::filename(*sst, sstables::component_type::Data).native());
return make_ready_future<>();
});
}).get();

View File

@@ -24,7 +24,7 @@ static auto copy_sst_to_tmpdir(fs::path tmp_path, test_env& env, sstables::schem
auto dst_path = tmp_path / src_path.filename() / format("gen-{}", gen);
recursive_touch_directory(dst_path.native()).get();
for (auto p : sst->all_components()) {
auto src_path = fs::path(sst->filename(p.first));
auto src_path = test::filename(*sst, p.first);
copy_file(src_path, dst_path / src_path.filename());
}
return std::make_pair(env.reusable_sst(schema_ptr, dst_path.native(), gen).get0(), dst_path.native());
@@ -64,14 +64,14 @@ SEASTAR_THREAD_TEST_CASE(test_sstable_move) {
static bool partial_create_links(sstable_ptr sst, fs::path dst_path, int64_t gen, int count) {
auto schema = sst->get_schema();
auto tmp_toc = sstable::filename(dst_path.native(), schema->ks_name(), schema->cf_name(), sst->get_version(), generation_from_value(gen), sstable_format_types::big, component_type::TemporaryTOC);
link_file(sst->filename(component_type::TOC), tmp_toc).get();
link_file(test::filename(*sst, component_type::TOC).native(), tmp_toc).get();
for (auto& [c, s] : sst->all_components()) {
if (count-- <= 0) {
return false;
}
auto src = sst->filename(c);
auto src = test::filename(*sst, c);
auto dst = sstable::filename(dst_path.native(), schema->ks_name(), schema->cf_name(), sst->get_version(), generation_from_value(gen), sstable_format_types::big, c);
link_file(src, dst).get();
link_file(src.native(), dst).get();
}
if (count-- <= 0) {
return false;

View File

@@ -483,7 +483,7 @@ test_sstable_exists(sstring dir, unsigned long generation, bool exists) {
SEASTAR_TEST_CASE(statistics_rewrite) {
return test_setup::do_with_cloned_tmp_directory(uncompressed_dir(), [] (test_env& env, sstring uncompressed_dir, sstring generation_dir) {
return env.reusable_sst(uncompressed_schema(), uncompressed_dir, 1).then([generation_dir] (auto sstp) {
return sstp->create_links(generation_dir).then([sstp] {});
return test::create_links(*sstp, generation_dir).then([sstp] {});
}).then([generation_dir] {
return test_sstable_exists(generation_dir, 1, true);
}).then([&env, generation_dir] {

View File

@@ -139,7 +139,7 @@ public:
}
void change_dir(sstring dir) {
_sst->_dir = dir;
_sst->_storage.dir = dir;
}
void set_data_file_size(uint64_t size) {
@@ -158,12 +158,12 @@ public:
_sst->_recognized_components.erase(component_type::Index);
_sst->_recognized_components.erase(component_type::Data);
return seastar::async([sst = _sst] {
sst->write_toc(default_priority_class());
sst->open_sstable(default_priority_class());
sst->write_statistics(default_priority_class());
sst->write_compression(default_priority_class());
sst->write_filter(default_priority_class());
sst->write_summary(default_priority_class());
sst->seal_sstable().get();
sst->seal_sstable(false).get();
});
}
@@ -201,8 +201,8 @@ public:
void rewrite_toc_without_scylla_component() {
_sst->_recognized_components.erase(component_type::Scylla);
remove_file(_sst->filename(component_type::TOC)).get();
_sst->write_toc(default_priority_class());
_sst->seal_sstable().get();
_sst->_storage.open(*_sst, default_priority_class());
_sst->seal_sstable(false).get();
}
future<> remove_component(component_type c) {
@@ -216,6 +216,14 @@ public:
void set_shards(std::vector<unsigned> shards) {
_sst->_shards = std::move(shards);
}
static future<> create_links(const sstable& sst, const sstring& dir) {
return sst._storage.create_links(sst, dir);
}
static fs::path filename(const sstable& sst, component_type c) {
return fs::path(sst.filename(c));
}
};
inline auto replacer_fn_no_op() {