Merge 'Encapsulate filesystem access by sstable into filesystem_storage subsclass' from Pavel Emelyanov
This is to define the API sstable needs from underlying storage. When implementing object-storage backend it will need to implement those. The API looks like
future<> snapshot(const sstable& sst, sstring dir, absolute_path abs) const;
future<> quarantine(const sstable& sst, delayed_commit_changes* delay);
future<> move(const sstable& sst, sstring new_dir, generation_type generation, delayed_commit_changes* delay);
void open(sstable& sst, const io_priority_class& pc); // runs in async context
future<> wipe(const sstable& sst) noexcept;
future<file> open_component(const sstable& sst, component_type type, open_flags flags, file_open_options options, bool check_integrity);
It doesn't have "list" or alike, because it's not a method of an individual sstable, but rather the one from sstables_manager. It will come as separate PR.
Closes #12217
* github.com:scylladb/scylladb:
sstable, storage: Mark dir/temp_dir private
sstable: Remove get_dir() (well, almost)
sstable: Add quarantine() method to storage
sstable: Use absolute/relative path marking for snapshot()
sstable: Remove temp_... stuff from sstable
sstable: Move open_component() on storage
sstable: Mark rename_new_sstable_component_file() const
sstable: Print filename(type) on open-component error
sstable: Reorganize new_sstable_component_file()
sstable: Mark filename() private
sstable: Introduce index_filename()
tests: Disclosure private filename() calls
sstable: Move wipe_storage() on storage
sstable: Remove temp dir in wipe_storage()
sstable: Move unlink parts into wipe_storage
sstable: Remove get_temp_dir()
sstable: Move write_toc() to storage
sstable: Shuffle open_sstable()
sstable: Move touch_temp_dir() to storage
sstable: Move move() to storage
sstable: Move create_links() to storage
sstable: Move seal_sstable() to storage
sstable: Tossing internals of seal_sstable()
sstable: Move remove_temp_dir() to storage
sstable: Move create_links_common() to storage
sstable: Move check_create_links_replay() to storage
sstable: Remove one of create_links() overloads
sstable: Remove create_links_and_mark_for_removal()
sstable: Indentation fix after prevuous patch
sstable: Coroutinize create_links_common()
sstable: Rename create_links_common()'s "dir" argument
sstable: Make mark_for_removal bool_class
sstable, table: Add sstable::snapshot() and use in table::take_snapshot
sstable: Move _dir and _temp_dir on filesystem_storage
sstable: Use sync_directory() method
test, sstable: Use component_basename in test
sstables: Move read_{digest|checksum} on sstable
This commit is contained in:
@@ -1501,7 +1501,7 @@ future<table::snapshot_file_set> table::take_snapshot(database& db, sstring json
|
||||
table_names->insert(sstable->component_basename(sstables::component_type::Data));
|
||||
return with_semaphore(db.get_sharded_sst_dir_semaphore().local()._sem, 1, [&jsondir, sstable] {
|
||||
return io_check([sstable, &dir = jsondir] {
|
||||
return sstable->create_links(dir);
|
||||
return sstable->snapshot(dir);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
@@ -157,10 +157,10 @@ private:
|
||||
public:
|
||||
void verify_end_state() const {
|
||||
if (this->_remain > 0) {
|
||||
throw malformed_sstable_exception(fmt::format("index_consume_entry_context (state={}): parsing ended but there is unconsumed data", _state), _sst.filename(component_type::Index));
|
||||
throw malformed_sstable_exception(fmt::format("index_consume_entry_context (state={}): parsing ended but there is unconsumed data", _state), _sst.index_filename());
|
||||
}
|
||||
if (_state != state::KEY_SIZE && _state != state::START) {
|
||||
throw malformed_sstable_exception(fmt::format("index_consume_entry_context (state={}): cannot finish parsing current entry, no more data", _state), _sst.filename(component_type::Index));
|
||||
throw malformed_sstable_exception(fmt::format("index_consume_entry_context (state={}): cannot finish parsing current entry, no more data", _state), _sst.index_filename());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -308,7 +308,7 @@ inline file make_tracked_index_file(sstable& sst, reader_permit permit, tracing:
|
||||
if (!trace_state) {
|
||||
return f;
|
||||
}
|
||||
return tracing::make_traced_file(std::move(f), std::move(trace_state), format("{}:", sst.filename(component_type::Index)));
|
||||
return tracing::make_traced_file(std::move(f), std::move(trace_state), format("{}:", sst.index_filename()));
|
||||
}
|
||||
|
||||
inline
|
||||
@@ -541,7 +541,7 @@ private:
|
||||
bound.current_index_idx = 0;
|
||||
bound.current_pi_idx = 0;
|
||||
if (bound.current_list->empty()) {
|
||||
throw malformed_sstable_exception(format("missing index entry for summary index {} (bound {})", summary_idx, fmt::ptr(&bound)), _sstable->filename(component_type::Index));
|
||||
throw malformed_sstable_exception(format("missing index entry for summary index {} (bound {})", summary_idx, fmt::ptr(&bound)), _sstable->index_filename());
|
||||
}
|
||||
bound.data_file_position = bound.current_list->_entries[0]->position();
|
||||
bound.element = indexable_element::partition;
|
||||
|
||||
@@ -787,8 +787,7 @@ public:
|
||||
// exactly what callers used to do anyway.
|
||||
estimated_partitions = std::max(uint64_t(1), estimated_partitions);
|
||||
|
||||
_sst.generate_toc(_schema.get_compressor_params().get_compressor(), _schema.bloom_filter_fp_chance());
|
||||
_sst.write_toc(_pc);
|
||||
_sst.open_sstable(_pc);
|
||||
_sst.create_data().get();
|
||||
_compression_enabled = !_sst.has_component(component_type::CRC);
|
||||
init_file_writers();
|
||||
|
||||
@@ -488,11 +488,11 @@ future<> sstable_directory::delete_atomically(std::vector<shared_sstable> ssts)
|
||||
gen_tracker.update(sst->generation());
|
||||
|
||||
if (sstdir.empty()) {
|
||||
sstdir = sst->get_dir();
|
||||
sstdir = sst->_storage.prefix();
|
||||
} else {
|
||||
// All sstables are assumed to be in the same column_family, hence
|
||||
// sharing their base directory.
|
||||
assert (sstdir == sst->get_dir());
|
||||
assert (sstdir == sst->_storage.prefix());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -133,21 +133,35 @@ static future<file> open_sstable_component_file_non_checked(std::string_view nam
|
||||
return open_file_dma(name, flags, options);
|
||||
}
|
||||
|
||||
future<> sstable::rename_new_sstable_component_file(sstring from_name, sstring to_name) {
|
||||
future<> sstable::rename_new_sstable_component_file(sstring from_name, sstring to_name) const {
|
||||
return sstable_write_io_check(rename_file, from_name, to_name).handle_exception([from_name, to_name] (std::exception_ptr ep) {
|
||||
sstlog.error("Could not rename SSTable component {} to {}. Found exception: {}", from_name, to_name, ep);
|
||||
return make_exception_future<>(ep);
|
||||
});
|
||||
}
|
||||
|
||||
future<file> sstable::new_sstable_component_file(const io_error_handler& error_handler, component_type type, open_flags flags, file_open_options options) noexcept {
|
||||
try {
|
||||
future<file> sstable::filesystem_storage::open_component(const sstable& sst, component_type type, open_flags flags, file_open_options options, bool check_integrity) {
|
||||
auto create_flags = open_flags::create | open_flags::exclusive;
|
||||
auto readonly = (flags & create_flags) != create_flags;
|
||||
auto name = !readonly && _temp_dir ? temp_filename(type) : filename(type);
|
||||
auto tgt_dir = !readonly && temp_dir ? dir + "/" + sstable::sst_dir_basename(sst._generation) : dir;
|
||||
auto name = sst.filename(tgt_dir, type);
|
||||
|
||||
auto f = open_sstable_component_file_non_checked(name, flags, options,
|
||||
_manager.config().enable_sstable_data_integrity_check());
|
||||
auto f = open_sstable_component_file_non_checked(name, flags, options, check_integrity);
|
||||
|
||||
if (!readonly) {
|
||||
f = with_file_close_on_failure(std::move(f), [&sst, type, name = std::move(name)] (file fd) mutable {
|
||||
return sst.rename_new_sstable_component_file(name, sst.filename(type)).then([fd = std::move(fd)] () mutable {
|
||||
return make_ready_future<file>(std::move(fd));
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
return f;
|
||||
}
|
||||
|
||||
future<file> sstable::new_sstable_component_file(const io_error_handler& error_handler, component_type type, open_flags flags, file_open_options options) noexcept {
|
||||
try {
|
||||
auto f = _storage.open_component(*this, type, flags, options, _manager.config().enable_sstable_data_integrity_check());
|
||||
|
||||
if (type != component_type::TOC && type != component_type::TemporaryTOC) {
|
||||
for (auto * ext : _manager.config().extensions().sstable_file_io_extensions()) {
|
||||
@@ -163,17 +177,10 @@ future<file> sstable::new_sstable_component_file(const io_error_handler& error_h
|
||||
return make_checked_file(error_handler, std::move(f));
|
||||
});
|
||||
|
||||
if (!readonly) {
|
||||
f = with_file_close_on_failure(std::move(f).handle_exception([name] (auto ep) {
|
||||
sstlog.error("Could not create SSTable component {}. Found exception: {}", name, ep);
|
||||
return make_exception_future<file>(ep);
|
||||
}), [this, type, name = std::move(name)] (file fd) mutable {
|
||||
return rename_new_sstable_component_file(name, filename(type)).then([fd = std::move(fd)] () mutable {
|
||||
return make_ready_future<file>(std::move(fd));
|
||||
});
|
||||
});
|
||||
}
|
||||
return f;
|
||||
return f.handle_exception([this, type] (auto ep) {
|
||||
sstlog.error("Could not create SSTable component {}. Found exception: {}", filename(type), ep);
|
||||
return make_exception_future<file>(ep);
|
||||
});
|
||||
} catch (...) {
|
||||
return current_exception_as_future<file>();
|
||||
}
|
||||
@@ -867,7 +874,7 @@ future<> sstable::read_toc() noexcept {
|
||||
|
||||
}
|
||||
|
||||
void sstable::generate_toc(compressor_ptr c, double filter_fp_chance) {
|
||||
void sstable::generate_toc() {
|
||||
// Creating table of components.
|
||||
_recognized_components.insert(component_type::TOC);
|
||||
_recognized_components.insert(component_type::Statistics);
|
||||
@@ -875,10 +882,10 @@ void sstable::generate_toc(compressor_ptr c, double filter_fp_chance) {
|
||||
_recognized_components.insert(component_type::Index);
|
||||
_recognized_components.insert(component_type::Summary);
|
||||
_recognized_components.insert(component_type::Data);
|
||||
if (filter_fp_chance != 1.0) {
|
||||
if (_schema->bloom_filter_fp_chance() != 1.0) {
|
||||
_recognized_components.insert(component_type::Filter);
|
||||
}
|
||||
if (c == nullptr) {
|
||||
if (_schema->get_compressor_params().get_compressor() == nullptr) {
|
||||
_recognized_components.insert(component_type::CRC);
|
||||
} else {
|
||||
_recognized_components.insert(component_type::CompressionInfo);
|
||||
@@ -926,9 +933,14 @@ future<file_writer> sstable::make_component_file_writer(component_type c, file_o
|
||||
});
|
||||
}
|
||||
|
||||
void sstable::write_toc(const io_priority_class& pc) {
|
||||
touch_temp_dir().get0();
|
||||
auto file_path = filename(component_type::TemporaryTOC);
|
||||
void sstable::open_sstable(const io_priority_class& pc) {
|
||||
generate_toc();
|
||||
_storage.open(*this, pc);
|
||||
}
|
||||
|
||||
void sstable::filesystem_storage::open(sstable& sst, const io_priority_class& pc) {
|
||||
touch_temp_dir(sst).get0();
|
||||
auto file_path = sst.filename(component_type::TemporaryTOC);
|
||||
|
||||
sstlog.debug("Writing TOC file {} ", file_path);
|
||||
|
||||
@@ -939,51 +951,44 @@ void sstable::write_toc(const io_priority_class& pc) {
|
||||
file_output_stream_options options;
|
||||
options.buffer_size = 4096;
|
||||
options.io_priority_class = pc;
|
||||
auto w = make_component_file_writer(component_type::TemporaryTOC, std::move(options)).get0();
|
||||
auto w = sst.make_component_file_writer(component_type::TemporaryTOC, std::move(options)).get0();
|
||||
|
||||
bool toc_exists = file_exists(filename(component_type::TOC)).get0();
|
||||
bool toc_exists = file_exists(sst.filename(component_type::TOC)).get0();
|
||||
if (toc_exists) {
|
||||
// TOC will exist at this point if write_components() was called with
|
||||
// the generation of a sstable that exists.
|
||||
w.close();
|
||||
remove_file(file_path).get();
|
||||
throw std::runtime_error(format("SSTable write failed due to existence of TOC file for generation {:d} of {}.{}", _generation, _schema->ks_name(), _schema->cf_name()));
|
||||
throw std::runtime_error(format("SSTable write failed due to existence of TOC file for generation {:d} of {}.{}", sst._generation, sst._schema->ks_name(), sst._schema->cf_name()));
|
||||
}
|
||||
|
||||
for (auto&& key : _recognized_components) {
|
||||
for (auto&& key : sst._recognized_components) {
|
||||
// new line character is appended to the end of each component name.
|
||||
auto value = sstable_version_constants::get_component_map(_version).at(key) + "\n";
|
||||
auto value = sstable_version_constants::get_component_map(sst._version).at(key) + "\n";
|
||||
bytes b = bytes(reinterpret_cast<const bytes::value_type *>(value.c_str()), value.size());
|
||||
write(_version, w, b);
|
||||
write(sst._version, w, b);
|
||||
}
|
||||
w.flush();
|
||||
w.close();
|
||||
|
||||
// Flushing parent directory to guarantee that temporary TOC file reached
|
||||
// the disk.
|
||||
file dir_f = open_checked_directory(_write_error_handler, _dir).get0();
|
||||
sstable_write_io_check([&] {
|
||||
dir_f.flush().get();
|
||||
dir_f.close().get();
|
||||
});
|
||||
sst.sstable_write_io_check(sync_directory, dir).get();
|
||||
}
|
||||
|
||||
future<> sstable::seal_sstable() {
|
||||
future<> sstable::filesystem_storage::seal(const sstable& sst) {
|
||||
// SSTable sealing is about renaming temporary TOC file after guaranteeing
|
||||
// that each component reached the disk safely.
|
||||
co_await remove_temp_dir();
|
||||
auto dir_f = co_await open_checked_directory(_write_error_handler, _dir);
|
||||
auto dir_f = co_await open_checked_directory(sst._write_error_handler, dir);
|
||||
// Guarantee that every component of this sstable reached the disk.
|
||||
co_await sstable_write_io_check([&] { return dir_f.flush(); });
|
||||
co_await sst.sstable_write_io_check([&] { return dir_f.flush(); });
|
||||
// Rename TOC because it's no longer temporary.
|
||||
co_await sstable_write_io_check(rename_file, filename(component_type::TemporaryTOC), filename(component_type::TOC));
|
||||
co_await sstable_write_io_check([&] { return dir_f.flush(); });
|
||||
co_await sstable_write_io_check([&] { return dir_f.close(); });
|
||||
if (_marked_for_deletion == mark_for_deletion::implicit) {
|
||||
_marked_for_deletion = mark_for_deletion::none;
|
||||
}
|
||||
co_await sst.sstable_write_io_check(rename_file, sst.filename(component_type::TemporaryTOC), sst.filename(component_type::TOC));
|
||||
co_await sst.sstable_write_io_check([&] { return dir_f.flush(); });
|
||||
co_await sst.sstable_write_io_check([&] { return dir_f.close(); });
|
||||
// If this point was reached, sstable should be safe in disk.
|
||||
sstlog.debug("SSTable with generation {} of {}.{} was sealed successfully.", _generation, _schema->ks_name(), _schema->cf_name());
|
||||
sstlog.debug("SSTable with generation {} of {}.{} was sealed successfully.", sst._generation, sst._schema->ks_name(), sst._schema->cf_name());
|
||||
}
|
||||
|
||||
void sstable::write_crc(const checksum& c) {
|
||||
@@ -1755,13 +1760,12 @@ bool sstable::may_contain_rows(const query::clustering_row_ranges& ranges) const
|
||||
|
||||
future<> sstable::seal_sstable(bool backup)
|
||||
{
|
||||
return seal_sstable().then([this, backup] {
|
||||
return _storage.seal(*this).then([this, backup] {
|
||||
if (_marked_for_deletion == mark_for_deletion::implicit) {
|
||||
_marked_for_deletion = mark_for_deletion::none;
|
||||
}
|
||||
if (backup) {
|
||||
auto dir = get_dir() + "/backups/";
|
||||
auto fut = sstable_touch_directory_io_check(dir);
|
||||
return fut.then([this, dir = std::move(dir)] () mutable {
|
||||
return create_links(std::move(dir));
|
||||
});
|
||||
return _storage.snapshot(*this, "backups", filesystem_storage::absolute_path::no);
|
||||
}
|
||||
return make_ready_future<>();
|
||||
});
|
||||
@@ -1962,26 +1966,26 @@ void sstable::validate_originating_host_id() const {
|
||||
}
|
||||
}
|
||||
|
||||
future<> sstable::touch_temp_dir() {
|
||||
if (_temp_dir) {
|
||||
future<> sstable::filesystem_storage::touch_temp_dir(const sstable& sst) {
|
||||
if (temp_dir) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
auto temp_dir = get_temp_dir();
|
||||
sstlog.debug("Touching temp_dir={}", temp_dir);
|
||||
auto fut = sstable_touch_directory_io_check(temp_dir);
|
||||
return fut.then([this, temp_dir = std::move(temp_dir)] () mutable {
|
||||
_temp_dir = std::move(temp_dir);
|
||||
auto tmp = dir + "/" + sstable::sst_dir_basename(sst._generation);
|
||||
sstlog.debug("Touching temp_dir={}", tmp);
|
||||
auto fut = sst.sstable_touch_directory_io_check(tmp);
|
||||
return fut.then([this, tmp = std::move(tmp)] () mutable {
|
||||
temp_dir = std::move(tmp);
|
||||
});
|
||||
}
|
||||
|
||||
future<> sstable::remove_temp_dir() {
|
||||
if (!_temp_dir) {
|
||||
future<> sstable::filesystem_storage::remove_temp_dir() {
|
||||
if (!temp_dir) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
sstlog.debug("Removing temp_dir={}", _temp_dir);
|
||||
return remove_file(*_temp_dir).then_wrapped([this] (future<> f) {
|
||||
sstlog.debug("Removing temp_dir={}", temp_dir);
|
||||
return remove_file(*temp_dir).then_wrapped([this] (future<> f) {
|
||||
if (!f.failed()) {
|
||||
_temp_dir.reset();
|
||||
temp_dir.reset();
|
||||
return make_ready_future<>();
|
||||
}
|
||||
auto ep = f.get_exception();
|
||||
@@ -2001,15 +2005,15 @@ std::vector<sstring> sstable::component_filenames() const {
|
||||
}
|
||||
|
||||
bool sstable::requires_view_building() const {
|
||||
return boost::algorithm::ends_with(_dir, staging_dir);
|
||||
return boost::algorithm::ends_with(_storage.prefix(), staging_dir);
|
||||
}
|
||||
|
||||
bool sstable::is_quarantined() const noexcept {
|
||||
return boost::algorithm::ends_with(_dir, quarantine_dir);
|
||||
return boost::algorithm::ends_with(_storage.prefix(), quarantine_dir);
|
||||
}
|
||||
|
||||
bool sstable::is_uploaded() const noexcept {
|
||||
return boost::algorithm::ends_with(_dir, upload_dir);
|
||||
return boost::algorithm::ends_with(_storage.prefix(), upload_dir);
|
||||
}
|
||||
|
||||
sstring sstable::component_basename(const sstring& ks, const sstring& cf, version_types version, generation_type generation,
|
||||
@@ -2096,12 +2100,12 @@ future<> idempotent_link_file(sstring oldpath, sstring newpath) noexcept {
|
||||
// from staging to the base dir, for example, right after create_links completes,
|
||||
// and right before deleting the source links.
|
||||
// We end up in two valid sstables in this case, so make create_links idempotent.
|
||||
future<> sstable::check_create_links_replay(const sstring& dst_dir, generation_type dst_gen,
|
||||
future<> sstable::filesystem_storage::check_create_links_replay(const sstable& sst, const sstring& dst_dir, generation_type dst_gen,
|
||||
const std::vector<std::pair<sstables::component_type, sstring>>& comps) const {
|
||||
return parallel_for_each(comps, [this, &dst_dir, dst_gen] (const auto& p) mutable {
|
||||
return parallel_for_each(comps, [this, &sst, &dst_dir, dst_gen] (const auto& p) mutable {
|
||||
auto comp = p.second;
|
||||
auto src = sstable::filename(_dir, _schema->ks_name(), _schema->cf_name(), _version, _generation, _format, comp);
|
||||
auto dst = sstable::filename(dst_dir, _schema->ks_name(), _schema->cf_name(), _version, dst_gen, _format, comp);
|
||||
auto src = sstable::filename(dir, sst._schema->ks_name(), sst._schema->cf_name(), sst._version, sst._generation, sst._format, comp);
|
||||
auto dst = sstable::filename(dst_dir, sst._schema->ks_name(), sst._schema->cf_name(), sst._version, dst_gen, sst._format, comp);
|
||||
return do_with(std::move(src), std::move(dst), [this] (const sstring& src, const sstring& dst) mutable {
|
||||
return file_exists(dst).then([&, this] (bool exists) mutable {
|
||||
if (!exists) {
|
||||
@@ -2117,7 +2121,7 @@ future<> sstable::check_create_links_replay(const sstring& dst_dir, generation_t
|
||||
if (!same) {
|
||||
auto msg = format("Error while linking SSTable: {} to {}: File exists", src, dst);
|
||||
sstlog.error("{}", msg);
|
||||
return make_exception_future<>(malformed_sstable_exception(msg, _dir));
|
||||
return make_exception_future<>(malformed_sstable_exception(msg, dir));
|
||||
}
|
||||
return make_ready_future<>();
|
||||
});
|
||||
@@ -2161,90 +2165,100 @@ future<> sstable::check_create_links_replay(const sstring& dst_dir, generation_t
|
||||
/// are used. These versions handle EEXIST errors that may happen
|
||||
/// when the respective operations are replayed.
|
||||
///
|
||||
/// \param dir - the destination directory.
|
||||
/// \param sst - the sstable to work on
|
||||
/// \param dst_dir - the destination directory.
|
||||
/// \param generation - the generation of the destination sstable
|
||||
/// \param mark_for_removal - mark the sstable for removal after linking it to the destination dir
|
||||
future<> sstable::create_links_common(const sstring& dir, generation_type generation, bool mark_for_removal) const {
|
||||
sstlog.trace("create_links: {} -> {} generation={} mark_for_removal={}", get_filename(), dir, generation, mark_for_removal);
|
||||
return do_with(dir, all_components(), [this, generation, mark_for_removal] (const sstring& dir, auto& comps) {
|
||||
return check_create_links_replay(dir, generation, comps).then([this, &dir, generation, &comps, mark_for_removal] {
|
||||
// TemporaryTOC is always first, TOC is always last
|
||||
auto dst = sstable::filename(dir, _schema->ks_name(), _schema->cf_name(), _version, generation, _format, component_type::TemporaryTOC);
|
||||
return sstable_write_io_check(idempotent_link_file, filename(component_type::TOC), std::move(dst)).then([this, &dir] {
|
||||
return sstable_write_io_check(sync_directory, dir);
|
||||
}).then([this, &dir, generation, &comps] {
|
||||
return parallel_for_each(comps, [this, &dir, generation] (auto p) {
|
||||
auto src = sstable::filename(_dir, _schema->ks_name(), _schema->cf_name(), _version, _generation, _format, p.second);
|
||||
auto dst = sstable::filename(dir, _schema->ks_name(), _schema->cf_name(), _version, generation, _format, p.second);
|
||||
return sstable_write_io_check(idempotent_link_file, std::move(src), std::move(dst));
|
||||
});
|
||||
}).then([this, &dir] {
|
||||
return sstable_write_io_check(sync_directory, dir);
|
||||
});
|
||||
}).then([this, &dir, generation, mark_for_removal] {
|
||||
auto dst_temp_toc = sstable::filename(dir, _schema->ks_name(), _schema->cf_name(), _version, generation, _format, component_type::TemporaryTOC);
|
||||
if (mark_for_removal) {
|
||||
// Now that the source sstable is linked to new_dir, mark the source links for
|
||||
// deletion by leaving a TemporaryTOC file in the source directory.
|
||||
auto src_temp_toc = sstable::filename(_dir, _schema->ks_name(), _schema->cf_name(), _version, _generation, _format, component_type::TemporaryTOC);
|
||||
return sstable_write_io_check(rename_file, std::move(dst_temp_toc), std::move(src_temp_toc)).then([this] {
|
||||
return sstable_write_io_check(sync_directory, _dir);
|
||||
});
|
||||
} else {
|
||||
// Now that the source sstable is linked to dir, remove
|
||||
// the TemporaryTOC file at the destination.
|
||||
return sstable_write_io_check(remove_file, std::move(dst_temp_toc));
|
||||
}
|
||||
}).then([this, &dir] {
|
||||
return sstable_write_io_check(sync_directory, dir);
|
||||
}).then([this, &dir, generation] {
|
||||
sstlog.trace("create_links: {} -> {} generation={}: done", get_filename(), dir, generation);
|
||||
});
|
||||
/// \param mark_for_removal - mark the sstable for removal after linking it to the destination dst_dir
|
||||
future<> sstable::filesystem_storage::create_links_common(const sstable& sst, sstring dst_dir, generation_type generation, mark_for_removal mark_for_removal) const {
|
||||
sstlog.trace("create_links: {} -> {} generation={} mark_for_removal={}", sst.get_filename(), dst_dir, generation, mark_for_removal);
|
||||
auto comps = sst.all_components();
|
||||
co_await check_create_links_replay(sst, dst_dir, generation, comps);
|
||||
// TemporaryTOC is always first, TOC is always last
|
||||
auto dst = sstable::filename(dst_dir, sst._schema->ks_name(), sst._schema->cf_name(), sst._version, generation, sst._format, component_type::TemporaryTOC);
|
||||
co_await sst.sstable_write_io_check(idempotent_link_file, sst.filename(component_type::TOC), std::move(dst));
|
||||
co_await sst.sstable_write_io_check(sync_directory, dst_dir);
|
||||
co_await parallel_for_each(comps, [this, &sst, &dst_dir, generation] (auto p) {
|
||||
auto src = sstable::filename(dir, sst._schema->ks_name(), sst._schema->cf_name(), sst._version, sst._generation, sst._format, p.second);
|
||||
auto dst = sstable::filename(dst_dir, sst._schema->ks_name(), sst._schema->cf_name(), sst._version, generation, sst._format, p.second);
|
||||
return sst.sstable_write_io_check(idempotent_link_file, std::move(src), std::move(dst));
|
||||
});
|
||||
co_await sst.sstable_write_io_check(sync_directory, dst_dir);
|
||||
auto dst_temp_toc = sstable::filename(dst_dir, sst._schema->ks_name(), sst._schema->cf_name(), sst._version, generation, sst._format, component_type::TemporaryTOC);
|
||||
if (mark_for_removal) {
|
||||
// Now that the source sstable is linked to new_dir, mark the source links for
|
||||
// deletion by leaving a TemporaryTOC file in the source directory.
|
||||
auto src_temp_toc = sstable::filename(dir, sst._schema->ks_name(), sst._schema->cf_name(), sst._version, sst._generation, sst._format, component_type::TemporaryTOC);
|
||||
co_await sst.sstable_write_io_check(rename_file, std::move(dst_temp_toc), std::move(src_temp_toc));
|
||||
co_await sst.sstable_write_io_check(sync_directory, dir);
|
||||
} else {
|
||||
// Now that the source sstable is linked to dir, remove
|
||||
// the TemporaryTOC file at the destination.
|
||||
co_await sst.sstable_write_io_check(remove_file, std::move(dst_temp_toc));
|
||||
}
|
||||
co_await sst.sstable_write_io_check(sync_directory, dst_dir);
|
||||
sstlog.trace("create_links: {} -> {} generation={}: done", sst.get_filename(), dst_dir, generation);
|
||||
}
|
||||
|
||||
future<> sstable::create_links(const sstring& dir, generation_type generation) const {
|
||||
return create_links_common(dir, generation, false /* mark_for_removal */);
|
||||
future<> sstable::filesystem_storage::create_links(const sstable& sst, const sstring& dir) const {
|
||||
return create_links_common(sst, dir, sst._generation, mark_for_removal::no);
|
||||
}
|
||||
|
||||
future<> sstable::create_links_and_mark_for_removal(const sstring& dir, generation_type generation) const {
|
||||
return create_links_common(dir, generation, true /* mark_for_removal */);
|
||||
future<> sstable::filesystem_storage::snapshot(const sstable& sst, sstring dir, absolute_path abs) const {
|
||||
if (!abs) {
|
||||
dir = this->dir + "/" + dir + "/";
|
||||
}
|
||||
co_await sst.sstable_touch_directory_io_check(dir);
|
||||
co_await create_links(sst, dir);
|
||||
}
|
||||
|
||||
future<> sstable::move_to_new_dir(sstring new_dir, generation_type new_generation, delayed_commit_changes* delay_commit) {
|
||||
sstring old_dir = get_dir();
|
||||
future<> sstable::snapshot(const sstring& dir) const {
|
||||
return _storage.snapshot(*this, dir, filesystem_storage::absolute_path::yes);
|
||||
}
|
||||
|
||||
future<> sstable::filesystem_storage::move(const sstable& sst, sstring new_dir, generation_type new_generation, delayed_commit_changes* delay_commit) {
|
||||
co_await touch_directory(new_dir);
|
||||
sstring old_dir = dir;
|
||||
sstlog.debug("Moving {} old_generation={} to {} new_generation={} do_sync_dirs={}",
|
||||
get_filename(), _generation, new_dir, new_generation, delay_commit == nullptr);
|
||||
co_await create_links_and_mark_for_removal(new_dir, new_generation);
|
||||
_dir = new_dir;
|
||||
generation_type old_generation = std::exchange(_generation, new_generation);
|
||||
co_await coroutine::parallel_for_each(all_components(), [this, old_generation, old_dir] (auto p) {
|
||||
return sstable_write_io_check(remove_file, sstable::filename(old_dir, _schema->ks_name(), _schema->cf_name(), _version, old_generation, _format, p.second));
|
||||
sst.get_filename(), sst._generation, new_dir, new_generation, delay_commit == nullptr);
|
||||
co_await create_links_common(sst, new_dir, new_generation, mark_for_removal::yes);
|
||||
dir = new_dir;
|
||||
generation_type old_generation = sst._generation;
|
||||
co_await coroutine::parallel_for_each(sst.all_components(), [&sst, old_generation, old_dir] (auto p) {
|
||||
return sst.sstable_write_io_check(remove_file, sstable::filename(old_dir, sst._schema->ks_name(), sst._schema->cf_name(), sst._version, old_generation, sst._format, p.second));
|
||||
});
|
||||
auto temp_toc = sstable_version_constants::get_component_map(_version).at(component_type::TemporaryTOC);
|
||||
co_await sstable_write_io_check(remove_file, sstable::filename(old_dir, _schema->ks_name(), _schema->cf_name(), _version, old_generation, _format, temp_toc));
|
||||
auto temp_toc = sstable_version_constants::get_component_map(sst._version).at(component_type::TemporaryTOC);
|
||||
co_await sst.sstable_write_io_check(remove_file, sstable::filename(old_dir, sst._schema->ks_name(), sst._schema->cf_name(), sst._version, old_generation, sst._format, temp_toc));
|
||||
if (delay_commit == nullptr) {
|
||||
co_await when_all(sstable_write_io_check(sync_directory, old_dir), sstable_write_io_check(sync_directory, new_dir)).discard_result();
|
||||
co_await when_all(sst.sstable_write_io_check(sync_directory, old_dir), sst.sstable_write_io_check(sync_directory, new_dir)).discard_result();
|
||||
} else {
|
||||
delay_commit->_dirs.insert(old_dir);
|
||||
delay_commit->_dirs.insert(new_dir);
|
||||
}
|
||||
}
|
||||
|
||||
future<> sstable::move_to_quarantine(delayed_commit_changes* delay_commit) {
|
||||
auto path = fs::path(_dir);
|
||||
sstring basename = path.filename().native();
|
||||
if (basename == quarantine_dir) {
|
||||
co_return;
|
||||
} else if (basename == staging_dir) {
|
||||
future<> sstable::move_to_new_dir(sstring new_dir, generation_type new_generation, delayed_commit_changes* delay_commit) {
|
||||
co_await _storage.move(*this, std::move(new_dir), new_generation, delay_commit);
|
||||
_generation = std::move(new_generation);
|
||||
}
|
||||
|
||||
future<> sstable::filesystem_storage::quarantine(const sstable& sst, delayed_commit_changes* delay_commit) {
|
||||
auto path = fs::path(dir);
|
||||
if (path.filename().native() == staging_dir) {
|
||||
path = path.parent_path();
|
||||
}
|
||||
// Note: moving a sstable in a snapshot or in the uploads dir to quarantine
|
||||
// will move it into a "quarantine" subdirectory of its current directory.
|
||||
auto new_dir = (path / sstables::quarantine_dir).native();
|
||||
sstlog.info("Moving SSTable {} to quarantine in {}", get_filename(), new_dir);
|
||||
co_await touch_directory(new_dir);
|
||||
co_await move_to_new_dir(std::move(new_dir), generation(), delay_commit);
|
||||
sstlog.info("Moving SSTable {} to quarantine in {}", sst.get_filename(), new_dir);
|
||||
co_await move(sst, std::move(new_dir), sst.generation(), delay_commit);
|
||||
}
|
||||
|
||||
future<> sstable::move_to_quarantine(delayed_commit_changes* delay_commit) {
|
||||
if (is_quarantined()) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
return _storage.quarantine(*this, delay_commit);
|
||||
}
|
||||
|
||||
future<> sstable::delayed_commit_changes::commit() {
|
||||
@@ -2528,8 +2542,12 @@ static future<bool> do_validate_uncompressed(input_stream<char>& stream, const c
|
||||
co_return valid;
|
||||
}
|
||||
|
||||
static future<uint32_t> read_digest(sstring filename, file_input_stream_options options) {
|
||||
auto digest_file = co_await open_file_dma(filename, open_flags::ro);
|
||||
future<uint32_t> sstable::read_digest(io_priority_class pc) {
|
||||
file_input_stream_options options;
|
||||
options.buffer_size = 4096;
|
||||
options.io_priority_class = pc;
|
||||
|
||||
auto digest_file = co_await open_file_dma(filename(component_type::Digest), open_flags::ro);
|
||||
auto digest_stream = make_file_input_stream(std::move(digest_file), options);
|
||||
|
||||
std::exception_ptr ex;
|
||||
@@ -2547,8 +2565,12 @@ static future<uint32_t> read_digest(sstring filename, file_input_stream_options
|
||||
co_return boost::lexical_cast<uint32_t>(digest_str);
|
||||
}
|
||||
|
||||
static future<checksum> read_checksum(sstring filename, file_input_stream_options options) {
|
||||
auto crc_file = co_await open_file_dma(filename, open_flags::ro);
|
||||
future<checksum> sstable::read_checksum(io_priority_class pc) {
|
||||
file_input_stream_options options;
|
||||
options.buffer_size = 4096;
|
||||
options.io_priority_class = pc;
|
||||
|
||||
auto crc_file = co_await open_file_dma(filename(component_type::CRC), open_flags::ro);
|
||||
auto crc_stream = make_file_input_stream(std::move(crc_file), options);
|
||||
|
||||
checksum checksum;
|
||||
@@ -2578,11 +2600,7 @@ static future<checksum> read_checksum(sstring filename, file_input_stream_option
|
||||
}
|
||||
|
||||
future<bool> validate_checksums(shared_sstable sst, reader_permit permit, const io_priority_class& pc) {
|
||||
file_input_stream_options options;
|
||||
options.buffer_size = 4096;
|
||||
options.io_priority_class = pc;
|
||||
|
||||
const auto digest = co_await read_digest(sst->filename(component_type::Digest), options);
|
||||
const auto digest = co_await sst->read_digest(pc);
|
||||
|
||||
auto data_stream = sst->data_stream(0, sst->ondisk_data_size(), pc, permit, nullptr, nullptr, sstable::raw_stream::yes);
|
||||
|
||||
@@ -2597,7 +2615,7 @@ future<bool> validate_checksums(shared_sstable sst, reader_permit permit, const
|
||||
valid = co_await do_validate_compressed<adler32_utils>(data_stream, sst->get_compression(), false, digest);
|
||||
}
|
||||
} else {
|
||||
auto checksum = co_await read_checksum(sst->filename(component_type::CRC), options);
|
||||
auto checksum = co_await sst->read_checksum(pc);
|
||||
if (sst->get_version() >= sstable_version_types::mc) {
|
||||
valid = co_await do_validate_uncompressed<crc32_utils>(data_stream, checksum, digest);
|
||||
} else {
|
||||
@@ -2754,20 +2772,6 @@ future<> sstable::close_files() {
|
||||
} catch (...) {
|
||||
sstlog.warn("Exception when deleting sstable file: {}", std::current_exception());
|
||||
}
|
||||
|
||||
if (_temp_dir) {
|
||||
try {
|
||||
unlinked_temp_dir = recursive_remove_directory(fs::path(*_temp_dir)).then_wrapped([this] (future<> f) {
|
||||
if (f.failed()) {
|
||||
sstlog.warn("Exception when deleting temporary sstable directory {}: {}", *_temp_dir, f.get_exception());
|
||||
} else {
|
||||
_temp_dir.reset();
|
||||
}
|
||||
});
|
||||
} catch (...) {
|
||||
sstlog.warn("Exception when deleting temporary sstable directory {}: {}", *_temp_dir, std::current_exception());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
_on_closed(*this);
|
||||
@@ -3006,37 +3010,47 @@ utils::hashed_key sstable::make_hashed_key(const schema& s, const partition_key&
|
||||
return utils::make_hashed_key(static_cast<bytes_view>(key::from_partition_key(s, key)));
|
||||
}
|
||||
|
||||
future<>
|
||||
sstable::unlink() noexcept {
|
||||
future<> sstable::filesystem_storage::wipe(const sstable& sst) noexcept {
|
||||
// We must be able to generate toc_filename()
|
||||
// in order to delete the sstable.
|
||||
// Running out of memory here will terminate.
|
||||
auto name = [this] () noexcept {
|
||||
auto name = [&sst] () noexcept {
|
||||
memory::scoped_critical_alloc_section _;
|
||||
return toc_filename();
|
||||
return sst.toc_filename();
|
||||
}();
|
||||
|
||||
// remove_by_toc_name doesn't throw
|
||||
auto fut = remove_by_toc_name(name);
|
||||
// remove_fut never fails
|
||||
auto remove_fut = fut.then_wrapped([&name] (future<> f) {
|
||||
if (f.failed()) {
|
||||
// Log and ignore the failure since there is nothing much we can do about it at this point.
|
||||
// a. Compaction will retry deleting the sstable in the next pass, and
|
||||
// b. in the future sstables_manager is planned to handle sstables deletion.
|
||||
// c. Eventually we may want to record these failures in a system table
|
||||
// and notify the administrator about that for manual handling (rather than aborting).
|
||||
sstlog.warn("Failed to delete {}: {}. Ignoring.", name, f.get_exception());
|
||||
try {
|
||||
co_await remove_by_toc_name(name);
|
||||
} catch (...) {
|
||||
// Log and ignore the failure since there is nothing much we can do about it at this point.
|
||||
// a. Compaction will retry deleting the sstable in the next pass, and
|
||||
// b. in the future sstables_manager is planned to handle sstables deletion.
|
||||
// c. Eventually we may want to record these failures in a system table
|
||||
// and notify the administrator about that for manual handling (rather than aborting).
|
||||
sstlog.warn("Failed to delete {}: {}. Ignoring.", name, std::current_exception());
|
||||
}
|
||||
|
||||
if (temp_dir) {
|
||||
try {
|
||||
co_await recursive_remove_directory(fs::path(*temp_dir));
|
||||
temp_dir.reset();
|
||||
} catch (...) {
|
||||
sstlog.warn("Exception when deleting temporary sstable directory {}: {}", *temp_dir, std::current_exception());
|
||||
}
|
||||
return make_ready_future<>();
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
future<>
|
||||
sstable::unlink() noexcept {
|
||||
auto remove_fut = _storage.wipe(*this);
|
||||
|
||||
try {
|
||||
co_await get_large_data_handler().maybe_delete_large_data_entries(shared_from_this());
|
||||
} catch (...) {
|
||||
memory::scoped_critical_alloc_section _;
|
||||
// Just log and ignore failures to delete large data entries.
|
||||
// They are not critical to the operation of the database.
|
||||
sstlog.warn("Failed to delete large data entry for {}: {}. Ignoring.", name, std::current_exception());
|
||||
sstlog.warn("Failed to delete large data entry for {}: {}. Ignoring.", toc_filename(), std::current_exception());
|
||||
}
|
||||
|
||||
co_await std::move(remove_fut);
|
||||
@@ -3185,8 +3199,8 @@ sstable::sstable(schema_ptr schema,
|
||||
size_t buffer_size)
|
||||
: sstable_buffer_size(buffer_size)
|
||||
, _schema(std::move(schema))
|
||||
, _dir(std::move(dir))
|
||||
, _generation(generation)
|
||||
, _storage(std::move(dir))
|
||||
, _version(v)
|
||||
, _format(f)
|
||||
, _index_cache(std::make_unique<partition_index_cache>(
|
||||
|
||||
@@ -351,14 +351,6 @@ public:
|
||||
return component_basename(_schema->ks_name(), _schema->cf_name(), _version, _generation, _format, f);
|
||||
}
|
||||
|
||||
sstring filename(component_type f) const {
|
||||
return filename(get_dir(), f);
|
||||
}
|
||||
|
||||
sstring temp_filename(component_type f) const {
|
||||
return filename(get_temp_dir(), f);
|
||||
}
|
||||
|
||||
sstring get_filename() const {
|
||||
return filename(component_type::Data);
|
||||
}
|
||||
@@ -367,12 +359,12 @@ public:
|
||||
return filename(component_type::TOC);
|
||||
}
|
||||
|
||||
static sstring sst_dir_basename(generation_type gen) {
|
||||
return fmt::format("{}.sstable", gen);
|
||||
sstring index_filename() const {
|
||||
return filename(component_type::Index);
|
||||
}
|
||||
|
||||
static sstring temp_sst_dir(const sstring& dir, generation_type gen) {
|
||||
return dir + "/" + sst_dir_basename(gen);
|
||||
static sstring sst_dir_basename(generation_type gen) {
|
||||
return fmt::format("{}.sstable", gen);
|
||||
}
|
||||
|
||||
static bool is_temp_dir(const fs::path& dirpath)
|
||||
@@ -397,11 +389,7 @@ public:
|
||||
|
||||
std::vector<std::pair<component_type, sstring>> all_components() const;
|
||||
|
||||
future<> create_links(const sstring& dir, generation_type generation) const;
|
||||
|
||||
future<> create_links(const sstring& dir) const {
|
||||
return create_links(dir, _generation);
|
||||
}
|
||||
future<> snapshot(const sstring& dir) const;
|
||||
|
||||
// Delete the sstable by unlinking all sstable files
|
||||
// Ignores all errors.
|
||||
@@ -473,19 +461,47 @@ public:
|
||||
const position_in_partition& last_partition_last_position() const noexcept {
|
||||
return _last_partition_last_position;
|
||||
}
|
||||
|
||||
using mark_for_removal = bool_class<class mark_for_removal_tag>;
|
||||
|
||||
class filesystem_storage {
|
||||
friend class test;
|
||||
sstring dir;
|
||||
std::optional<sstring> temp_dir; // Valid while the sstable is being created, until sealed
|
||||
|
||||
private:
|
||||
future<> check_create_links_replay(const sstable& sst, const sstring& dst_dir, generation_type dst_gen, const std::vector<std::pair<sstables::component_type, sstring>>& comps) const;
|
||||
future<> remove_temp_dir();
|
||||
future<> create_links(const sstable& sst, const sstring& dir) const;
|
||||
future<> create_links_common(const sstable& sst, sstring dst_dir, generation_type dst_gen, mark_for_removal mark_for_removal) const;
|
||||
future<> touch_temp_dir(const sstable& sst);
|
||||
|
||||
public:
|
||||
explicit filesystem_storage(sstring dir_) : dir(std::move(dir_)) {}
|
||||
|
||||
using absolute_path = bool_class<class absolute_path_tag>; // FIXME -- should go away eventually
|
||||
future<> seal(const sstable& sst);
|
||||
future<> snapshot(const sstable& sst, sstring dir, absolute_path abs) const;
|
||||
future<> quarantine(const sstable& sst, delayed_commit_changes* delay);
|
||||
future<> move(const sstable& sst, sstring new_dir, generation_type generation, delayed_commit_changes* delay);
|
||||
// runs in async context
|
||||
void open(sstable& sst, const io_priority_class& pc);
|
||||
future<> wipe(const sstable& sst) noexcept;
|
||||
future<file> open_component(const sstable& sst, component_type type, open_flags flags, file_open_options options, bool check_integrity);
|
||||
|
||||
sstring prefix() const { return dir; }
|
||||
};
|
||||
|
||||
private:
|
||||
sstring filename(component_type f) const {
|
||||
return filename(_storage.prefix(), f);
|
||||
}
|
||||
|
||||
sstring filename(const sstring& dir, component_type f) const {
|
||||
return filename(dir, _schema->ks_name(), _schema->cf_name(), _version, _generation, _format, f);
|
||||
}
|
||||
|
||||
friend class sstable_directory;
|
||||
const sstring& get_dir() const {
|
||||
return _dir;
|
||||
}
|
||||
|
||||
const sstring get_temp_dir() const {
|
||||
return temp_sst_dir(_dir, _generation);
|
||||
}
|
||||
|
||||
size_t sstable_buffer_size;
|
||||
|
||||
@@ -523,10 +539,10 @@ private:
|
||||
lw_shared_ptr<file_input_stream_history> _index_history = make_lw_shared<file_input_stream_history>();
|
||||
|
||||
schema_ptr _schema;
|
||||
sstring _dir;
|
||||
std::optional<sstring> _temp_dir; // Valid while the sstable is being created, until sealed
|
||||
generation_type _generation{0};
|
||||
|
||||
filesystem_storage _storage;
|
||||
|
||||
version_types _version;
|
||||
format_types _format;
|
||||
|
||||
@@ -577,18 +593,14 @@ private:
|
||||
void write_crc(const checksum& c);
|
||||
void write_digest(uint32_t full_checksum);
|
||||
|
||||
future<> rename_new_sstable_component_file(sstring from_file, sstring to_file);
|
||||
future<> rename_new_sstable_component_file(sstring from_file, sstring to_file) const;
|
||||
future<file> new_sstable_component_file(const io_error_handler& error_handler, component_type f, open_flags flags, file_open_options options = {}) noexcept;
|
||||
|
||||
future<file_writer> make_component_file_writer(component_type c, file_output_stream_options options,
|
||||
open_flags oflags = open_flags::wo | open_flags::create | open_flags::exclusive) noexcept;
|
||||
|
||||
future<> touch_temp_dir();
|
||||
future<> remove_temp_dir();
|
||||
|
||||
void generate_toc(compressor_ptr c, double filter_fp_chance);
|
||||
void write_toc(const io_priority_class& pc);
|
||||
future<> seal_sstable();
|
||||
void generate_toc();
|
||||
void open_sstable(const io_priority_class& pc);
|
||||
|
||||
future<> read_compression(const io_priority_class& pc);
|
||||
void write_compression(const io_priority_class& pc);
|
||||
@@ -696,10 +708,6 @@ private:
|
||||
}
|
||||
|
||||
future<> open_or_create_data(open_flags oflags, file_open_options options = {}) noexcept;
|
||||
|
||||
future<> check_create_links_replay(const sstring& dst_dir, generation_type dst_gen, const std::vector<std::pair<sstables::component_type, sstring>>& comps) const;
|
||||
future<> create_links_common(const sstring& dst_dir, generation_type dst_gen, bool mark_for_removal) const;
|
||||
future<> create_links_and_mark_for_removal(const sstring& dst_dir, generation_type dst_gen) const;
|
||||
public:
|
||||
future<> read_toc() noexcept;
|
||||
|
||||
@@ -912,6 +920,9 @@ public:
|
||||
friend void lw_shared_ptr_deleter<sstables::sstable>::dispose(sstable* s);
|
||||
gc_clock::time_point get_gc_before_for_drop_estimation(const gc_clock::time_point& compaction_time, const tombstone_gc_state& gc_state) const;
|
||||
gc_clock::time_point get_gc_before_for_fully_expire(const gc_clock::time_point& compaction_time, const tombstone_gc_state& gc_state) const;
|
||||
|
||||
future<uint32_t> read_digest(io_priority_class pc);
|
||||
future<checksum> read_checksum(io_priority_class pc);
|
||||
};
|
||||
|
||||
// Validate checksums
|
||||
|
||||
@@ -307,12 +307,13 @@ SEASTAR_THREAD_TEST_CASE(test_distributed_loader_with_incomplete_sstables) {
|
||||
require_exist(file_name, true);
|
||||
};
|
||||
|
||||
auto temp_sst_dir = sst::temp_sst_dir(sst_dir, generation_from_value(2));
|
||||
touch_dir(temp_sst_dir);
|
||||
auto temp_sst_dir_2 = sst_dir + "/" + sst::sst_dir_basename(generation_from_value(2));
|
||||
touch_dir(temp_sst_dir_2);
|
||||
|
||||
temp_sst_dir = sst::temp_sst_dir(sst_dir, generation_from_value(3));
|
||||
touch_dir(temp_sst_dir);
|
||||
auto temp_file_name = sst::filename(temp_sst_dir, ks, cf, sst::version_types::mc, generation_from_value(3), sst::format_types::big, component_type::TemporaryTOC);
|
||||
auto temp_sst_dir_3 = sst_dir + "/" + sst::sst_dir_basename(generation_from_value(3));
|
||||
touch_dir(temp_sst_dir_3);
|
||||
|
||||
auto temp_file_name = sst::filename(temp_sst_dir_3, ks, cf, sst::version_types::mc, generation_from_value(3), sst::format_types::big, component_type::TemporaryTOC);
|
||||
touch_file(temp_file_name);
|
||||
|
||||
temp_file_name = sst::filename(sst_dir, ks, cf, sst::version_types::mc, generation_from_value(4), sst::format_types::big, component_type::TemporaryTOC);
|
||||
@@ -320,9 +321,9 @@ SEASTAR_THREAD_TEST_CASE(test_distributed_loader_with_incomplete_sstables) {
|
||||
temp_file_name = sst::filename(sst_dir, ks, cf, sst::version_types::mc, generation_from_value(4), sst::format_types::big, component_type::Data);
|
||||
touch_file(temp_file_name);
|
||||
|
||||
do_with_cql_env_thread([&sst_dir, &ks, &cf, &require_exist] (cql_test_env& e) {
|
||||
require_exist(sst::temp_sst_dir(sst_dir, generation_from_value(2)), false);
|
||||
require_exist(sst::temp_sst_dir(sst_dir, generation_from_value(3)), false);
|
||||
do_with_cql_env_thread([&sst_dir, &ks, &cf, &require_exist, &temp_sst_dir_2, &temp_sst_dir_3] (cql_test_env& e) {
|
||||
require_exist(temp_sst_dir_2, false);
|
||||
require_exist(temp_sst_dir_3, false);
|
||||
|
||||
require_exist(sst::filename(sst_dir, ks, cf, sst::version_types::mc, generation_from_value(4), sst::format_types::big, component_type::TemporaryTOC), false);
|
||||
require_exist(sst::filename(sst_dir, ks, cf, sst::version_types::mc, generation_from_value(4), sst::format_types::big, component_type::Data), false);
|
||||
@@ -1160,7 +1161,7 @@ SEASTAR_TEST_CASE(snapshot_with_quarantine_works) {
|
||||
}
|
||||
// collect all expected sstable data files
|
||||
for (auto sst : sstables) {
|
||||
expected.insert(fs::path(sst->get_filename()).filename().native());
|
||||
expected.insert(sst->component_basename(sstables::component_type::Data));
|
||||
}
|
||||
if (std::exchange(found, true)) {
|
||||
co_return;
|
||||
|
||||
@@ -2095,7 +2095,7 @@ SEASTAR_TEST_CASE(test_unknown_component) {
|
||||
auto tmp = tmpdir();
|
||||
copy_directory("test/resource/sstables/unknown_component", std::string(tmp.path().string()) + "/unknown_component");
|
||||
auto sstp = env.reusable_sst(uncompressed_schema(), tmp.path().string() + "/unknown_component", 1).get0();
|
||||
sstp->create_links(tmp.path().string()).get();
|
||||
test::create_links(*sstp, tmp.path().string()).get();
|
||||
// check that create_links() moved unknown component to new dir
|
||||
BOOST_REQUIRE(file_exists(tmp.path().string() + "/la-1-big-UNKNOWN.txt").get0());
|
||||
|
||||
@@ -3143,7 +3143,7 @@ SEASTAR_TEST_CASE(test_validate_checksums) {
|
||||
valid = sstables::validate_checksums(sst, permit, default_priority_class()).get();
|
||||
BOOST_REQUIRE(valid);
|
||||
|
||||
auto sst_file = open_file_dma(sst->get_filename(), open_flags::wo).get();
|
||||
auto sst_file = open_file_dma(test::filename(*sst, sstables::component_type::Data).native(), open_flags::wo).get();
|
||||
auto close_sst_file = defer([&sst_file] { sst_file.close().get(); });
|
||||
|
||||
testlog.info("Validating corrupted {}", sst->get_filename());
|
||||
@@ -3188,7 +3188,7 @@ SEASTAR_TEST_CASE(partial_sstable_deletion_test) {
|
||||
auto sst = make_sstable_containing(sst_gen, {std::move(mut1)});
|
||||
|
||||
// Rename TOC into TMP toc, to stress deletion path for partial files
|
||||
rename_file(sst->toc_filename(), sst->filename(sstables::component_type::TemporaryTOC)).get();
|
||||
rename_file(test::filename(*sst, sstables::component_type::TOC).native(), test::filename(*sst, sstables::component_type::TemporaryTOC).native()).get();
|
||||
|
||||
sst->unlink().get();
|
||||
});
|
||||
|
||||
@@ -183,7 +183,7 @@ SEASTAR_TEST_CASE(sstable_directory_test_table_scan_incomplete_sstables) {
|
||||
|
||||
// Now there is one sstable to the upload directory, but it is incomplete and one component is missing.
|
||||
// We should fail validation and leave the directory untouched
|
||||
remove_file(sst->filename(sstables::component_type::Statistics)).get();
|
||||
remove_file(test::filename(*sst, sstables::component_type::Statistics).native()).get();
|
||||
|
||||
with_sstable_directory(dir.path(), 1,
|
||||
sstable_from_existing_file(env),
|
||||
@@ -220,7 +220,7 @@ SEASTAR_TEST_CASE(sstable_directory_test_table_temporary_toc) {
|
||||
return sstables::test_env::do_with_async([] (test_env& env) {
|
||||
auto dir = tmpdir();
|
||||
auto sst = make_sstable_for_this_shard(std::bind(new_sstable, std::ref(env), dir.path(), 1));
|
||||
rename_file(sst->filename(sstables::component_type::TOC), sst->filename(sstables::component_type::TemporaryTOC)).get();
|
||||
rename_file(test::filename(*sst, sstables::component_type::TOC).native(), test::filename(*sst, sstables::component_type::TemporaryTOC).native()).get();
|
||||
|
||||
with_sstable_directory(dir.path(), 1,
|
||||
sstable_from_existing_file(env),
|
||||
@@ -236,7 +236,7 @@ SEASTAR_TEST_CASE(sstable_directory_test_table_extra_temporary_toc) {
|
||||
return sstables::test_env::do_with_async([] (test_env& env) {
|
||||
auto dir = tmpdir();
|
||||
auto sst = make_sstable_for_this_shard(std::bind(new_sstable, std::ref(env), dir.path(), 1));
|
||||
link_file(sst->filename(sstables::component_type::TOC), sst->filename(sstables::component_type::TemporaryTOC)).get();
|
||||
link_file(test::filename(*sst, sstables::component_type::TOC).native(), test::filename(*sst, sstables::component_type::TemporaryTOC).native()).get();
|
||||
|
||||
with_sstable_directory(dir.path(), 1,
|
||||
sstable_from_existing_file(env),
|
||||
@@ -253,7 +253,7 @@ SEASTAR_TEST_CASE(sstable_directory_test_table_missing_toc) {
|
||||
auto dir = tmpdir();
|
||||
|
||||
auto sst = make_sstable_for_this_shard(std::bind(new_sstable, std::ref(env), dir.path(), 1));
|
||||
remove_file(sst->filename(sstables::component_type::TOC)).get();
|
||||
remove_file(test::filename(*sst, sstables::component_type::TOC).native()).get();
|
||||
|
||||
with_sstable_directory(dir.path(), 1,
|
||||
sstable_from_existing_file(env),
|
||||
@@ -279,10 +279,10 @@ SEASTAR_THREAD_TEST_CASE(sstable_directory_test_temporary_statistics) {
|
||||
auto dir = tmpdir();
|
||||
|
||||
auto sst = make_sstable_for_this_shard(std::bind(new_sstable, std::ref(env.local()), dir.path(), 1));
|
||||
auto tempstr = sst->filename(component_type::TemporaryStatistics);
|
||||
auto f = open_file_dma(tempstr, open_flags::rw | open_flags::create | open_flags::truncate).get0();
|
||||
auto tempstr = test::filename(*sst, component_type::TemporaryStatistics);
|
||||
auto f = open_file_dma(tempstr.native(), open_flags::rw | open_flags::create | open_flags::truncate).get0();
|
||||
f.close().get();
|
||||
auto tempstat = fs::canonical(fs::path(tempstr));
|
||||
auto tempstat = fs::canonical(tempstr);
|
||||
|
||||
with_sstable_directory(dir.path(), 1,
|
||||
sstable_from_existing_file(env),
|
||||
@@ -295,7 +295,7 @@ SEASTAR_THREAD_TEST_CASE(sstable_directory_test_temporary_statistics) {
|
||||
}).get();
|
||||
});
|
||||
|
||||
remove_file(sst->filename(sstables::component_type::Statistics)).get();
|
||||
remove_file(test::filename(*sst, sstables::component_type::Statistics).native()).get();
|
||||
|
||||
with_sstable_directory(dir.path(), 1,
|
||||
sstable_from_existing_file(env),
|
||||
@@ -312,7 +312,7 @@ SEASTAR_THREAD_TEST_CASE(sstable_directory_test_generation_sanity) {
|
||||
auto dir = tmpdir();
|
||||
make_sstable_for_this_shard(std::bind(new_sstable, std::ref(env.local()), dir.path(), 3333));
|
||||
auto sst = make_sstable_for_this_shard(std::bind(new_sstable, std::ref(env.local()), dir.path(), 6666));
|
||||
rename_file(sst->filename(sstables::component_type::TOC), sst->filename(sstables::component_type::TemporaryTOC)).get();
|
||||
rename_file(test::filename(*sst, sstables::component_type::TOC).native(), test::filename(*sst, sstables::component_type::TemporaryTOC).native()).get();
|
||||
|
||||
with_sstable_directory(dir.path(), 1,
|
||||
sstable_from_existing_file(env),
|
||||
@@ -415,7 +415,7 @@ SEASTAR_TEST_CASE(sstable_directory_test_table_lock_works) {
|
||||
// Collect all sstable file names
|
||||
sstdir.invoke_on_all([&] (sstable_directory& d) {
|
||||
return d.do_for_each_sstable([&] (sstables::shared_sstable sst) {
|
||||
sstables[this_shard_id()].push_back(sst->get_filename());
|
||||
sstables[this_shard_id()].push_back(test::filename(*sst, sstables::component_type::Data).native());
|
||||
return make_ready_future<>();
|
||||
});
|
||||
}).get();
|
||||
|
||||
@@ -24,7 +24,7 @@ static auto copy_sst_to_tmpdir(fs::path tmp_path, test_env& env, sstables::schem
|
||||
auto dst_path = tmp_path / src_path.filename() / format("gen-{}", gen);
|
||||
recursive_touch_directory(dst_path.native()).get();
|
||||
for (auto p : sst->all_components()) {
|
||||
auto src_path = fs::path(sst->filename(p.first));
|
||||
auto src_path = test::filename(*sst, p.first);
|
||||
copy_file(src_path, dst_path / src_path.filename());
|
||||
}
|
||||
return std::make_pair(env.reusable_sst(schema_ptr, dst_path.native(), gen).get0(), dst_path.native());
|
||||
@@ -64,14 +64,14 @@ SEASTAR_THREAD_TEST_CASE(test_sstable_move) {
|
||||
static bool partial_create_links(sstable_ptr sst, fs::path dst_path, int64_t gen, int count) {
|
||||
auto schema = sst->get_schema();
|
||||
auto tmp_toc = sstable::filename(dst_path.native(), schema->ks_name(), schema->cf_name(), sst->get_version(), generation_from_value(gen), sstable_format_types::big, component_type::TemporaryTOC);
|
||||
link_file(sst->filename(component_type::TOC), tmp_toc).get();
|
||||
link_file(test::filename(*sst, component_type::TOC).native(), tmp_toc).get();
|
||||
for (auto& [c, s] : sst->all_components()) {
|
||||
if (count-- <= 0) {
|
||||
return false;
|
||||
}
|
||||
auto src = sst->filename(c);
|
||||
auto src = test::filename(*sst, c);
|
||||
auto dst = sstable::filename(dst_path.native(), schema->ks_name(), schema->cf_name(), sst->get_version(), generation_from_value(gen), sstable_format_types::big, c);
|
||||
link_file(src, dst).get();
|
||||
link_file(src.native(), dst).get();
|
||||
}
|
||||
if (count-- <= 0) {
|
||||
return false;
|
||||
|
||||
@@ -483,7 +483,7 @@ test_sstable_exists(sstring dir, unsigned long generation, bool exists) {
|
||||
SEASTAR_TEST_CASE(statistics_rewrite) {
|
||||
return test_setup::do_with_cloned_tmp_directory(uncompressed_dir(), [] (test_env& env, sstring uncompressed_dir, sstring generation_dir) {
|
||||
return env.reusable_sst(uncompressed_schema(), uncompressed_dir, 1).then([generation_dir] (auto sstp) {
|
||||
return sstp->create_links(generation_dir).then([sstp] {});
|
||||
return test::create_links(*sstp, generation_dir).then([sstp] {});
|
||||
}).then([generation_dir] {
|
||||
return test_sstable_exists(generation_dir, 1, true);
|
||||
}).then([&env, generation_dir] {
|
||||
|
||||
@@ -139,7 +139,7 @@ public:
|
||||
}
|
||||
|
||||
void change_dir(sstring dir) {
|
||||
_sst->_dir = dir;
|
||||
_sst->_storage.dir = dir;
|
||||
}
|
||||
|
||||
void set_data_file_size(uint64_t size) {
|
||||
@@ -158,12 +158,12 @@ public:
|
||||
_sst->_recognized_components.erase(component_type::Index);
|
||||
_sst->_recognized_components.erase(component_type::Data);
|
||||
return seastar::async([sst = _sst] {
|
||||
sst->write_toc(default_priority_class());
|
||||
sst->open_sstable(default_priority_class());
|
||||
sst->write_statistics(default_priority_class());
|
||||
sst->write_compression(default_priority_class());
|
||||
sst->write_filter(default_priority_class());
|
||||
sst->write_summary(default_priority_class());
|
||||
sst->seal_sstable().get();
|
||||
sst->seal_sstable(false).get();
|
||||
});
|
||||
}
|
||||
|
||||
@@ -201,8 +201,8 @@ public:
|
||||
void rewrite_toc_without_scylla_component() {
|
||||
_sst->_recognized_components.erase(component_type::Scylla);
|
||||
remove_file(_sst->filename(component_type::TOC)).get();
|
||||
_sst->write_toc(default_priority_class());
|
||||
_sst->seal_sstable().get();
|
||||
_sst->_storage.open(*_sst, default_priority_class());
|
||||
_sst->seal_sstable(false).get();
|
||||
}
|
||||
|
||||
future<> remove_component(component_type c) {
|
||||
@@ -216,6 +216,14 @@ public:
|
||||
void set_shards(std::vector<unsigned> shards) {
|
||||
_sst->_shards = std::move(shards);
|
||||
}
|
||||
|
||||
static future<> create_links(const sstable& sst, const sstring& dir) {
|
||||
return sst._storage.create_links(sst, dir);
|
||||
}
|
||||
|
||||
static fs::path filename(const sstable& sst, component_type c) {
|
||||
return fs::path(sst.filename(c));
|
||||
}
|
||||
};
|
||||
|
||||
inline auto replacer_fn_no_op() {
|
||||
|
||||
Reference in New Issue
Block a user