diff --git a/sstables/sstable_directory.cc b/sstables/sstable_directory.cc index 58ddc16bba..916752f84f 100644 --- a/sstables/sstable_directory.cc +++ b/sstables/sstable_directory.cc @@ -35,6 +35,7 @@ bool manifest_json_filter(const fs::path&, const directory_entry& entry) { sstable_directory::components_lister::components_lister(std::filesystem::path dir) : _lister(dir, lister::dir_entry_types::of(), &manifest_json_filter) + , _state(std::make_unique()) { } @@ -57,17 +58,17 @@ sstable_directory::sstable_directory(sstables_manager& manager, , _sstable_dir(std::move(sstable_dir)) , _io_priority(std::move(io_prio)) , _error_handler_gen(error_handler_gen) + , _lister(_manager.get_components_lister(_sstable_dir)) , _unshared_remote_sstables(smp::count) {} -void -sstable_directory::handle_component(scan_state& state, sstables::entry_descriptor desc, fs::path filename) { +void sstable_directory::components_lister::handle(sstables::entry_descriptor desc, fs::path filename) { if ((generation_value(desc.generation) % smp::count) != this_shard_id()) { return; } dirlog.trace("for SSTable directory, scanning {}", filename); - state.generations_found.emplace(desc.generation, filename); + _state->generations_found.emplace(desc.generation, filename); switch (desc.component) { case component_type::TemporaryStatistics: @@ -75,13 +76,13 @@ sstable_directory::handle_component(scan_state& state, sstables::entry_descripto // for instance on mutate_level. We should delete it - so we mark it for deletion // here, but just the component. The old statistics file should still be there // and we'll go with it. - _files_for_removal.insert(filename.native()); + _state->files_for_removal.insert(filename.native()); break; case component_type::TOC: - state.descriptors.emplace(desc.generation, std::move(desc)); + _state->descriptors.emplace(desc.generation, std::move(desc)); break; case component_type::TemporaryTOC: - state.temp_toc_found.push_back(std::move(desc)); + _state->temp_toc_found.push_back(std::move(desc)); break; default: // Do nothing, and will validate when trying to load the file. @@ -164,10 +165,12 @@ sstable_directory::highest_version_seen() const { return _max_version_seen; } -future<> -sstable_directory::process_sstable_dir(process_flags flags) { +future<> sstable_directory::process_sstable_dir(process_flags flags) { dirlog.debug("Start processing directory {} for SSTables", _sstable_dir); + return _lister->process(*this, _sstable_dir, flags); +} +future<> sstable_directory::components_lister::process(sstable_directory& directory, fs::path location, process_flags flags) { // It seems wasteful that each shard is repeating this scan, and to some extent it is. // However, we still want to open the files and especially call process_dir() in a distributed // fashion not to overload any shard. Also in the common case the SSTables will all be @@ -180,25 +183,22 @@ sstable_directory::process_sstable_dir(process_flags flags) { // - If all shards scan in parallel, they can start loading sooner. That is faster than having // a separate step to fetch all files, followed by another step to distribute and process. - scan_state state; - - auto sstable_dir_lister = _manager.get_components_lister(_sstable_dir); std::exception_ptr ex; try { while (true) { - sstring name = co_await sstable_dir_lister.get(); + sstring name = co_await get(); if (name == "") { break; } - auto comps = sstables::entry_descriptor::make_descriptor(_sstable_dir.native(), name); - handle_component(state, std::move(comps), _sstable_dir / name); + auto comps = sstables::entry_descriptor::make_descriptor(location.native(), name); + handle(std::move(comps), location / name); } } catch (...) { ex = std::current_exception(); } - co_await sstable_dir_lister.close(); + co_await close(); if (ex) { - dirlog.debug("Could not process sstable directory {}: {}", _sstable_dir, ex); + dirlog.debug("Could not process sstable directory {}: {}", location, ex); // FIXME: waiting for https://github.com/scylladb/seastar/pull/1090 // co_await coroutine::return_exception(std::move(ex)); std::rethrow_exception(std::move(ex)); @@ -207,51 +207,54 @@ sstable_directory::process_sstable_dir(process_flags flags) { // Always okay to delete files with a temporary TOC. We want to do it before we process // the generations seen: it's okay to reuse those generations since the files will have // been deleted anyway. - for (auto& desc: state.temp_toc_found) { - auto range = state.generations_found.equal_range(desc.generation); + for (auto& desc: _state->temp_toc_found) { + auto range = _state->generations_found.equal_range(desc.generation); for (auto it = range.first; it != range.second; ++it) { auto& path = it->second; dirlog.trace("Scheduling to remove file {}, from an SSTable with a Temporary TOC", path.native()); - _files_for_removal.insert(path.native()); + _state->files_for_removal.insert(path.native()); } - state.generations_found.erase(range.first, range.second); - state.descriptors.erase(desc.generation); + _state->generations_found.erase(range.first, range.second); + _state->descriptors.erase(desc.generation); } - _max_generation_seen = boost::accumulate(state.generations_found | boost::adaptors::map_keys, generation_from_value(0), [] (generation_type a, generation_type b) { + directory._max_generation_seen = boost::accumulate(_state->generations_found | boost::adaptors::map_keys, generation_from_value(0), [] (generation_type a, generation_type b) { return std::max(a, b); }); dirlog.debug("After {} scanned, seen generation {}. {} descriptors found, {} different files found ", - _sstable_dir, _max_generation_seen, state.descriptors.size(), state.generations_found.size()); + location, directory._max_generation_seen, _state->descriptors.size(), _state->generations_found.size()); // _descriptors is everything with a TOC. So after we remove this, what's left is // SSTables for which a TOC was not found. - co_await parallel_for_each_restricted(state.descriptors, [this, flags, &state] (std::tuple&& t) { + co_await directory.parallel_for_each_restricted(_state->descriptors, [this, flags, &directory] (std::tuple&& t) { auto& desc = std::get<1>(t); - state.generations_found.erase(desc.generation); + _state->generations_found.erase(desc.generation); // This will try to pre-load this file and throw an exception if it is invalid - return process_descriptor(std::move(desc), flags); + return directory.process_descriptor(std::move(desc), flags); }); // For files missing TOC, it depends on where this is coming from. // If scylla was supposed to have generated this SSTable, this is not okay and // we refuse to proceed. If this coming from, say, an import, then we just delete, // log and proceed. - for (auto& path : state.generations_found | boost::adaptors::map_values) { + for (auto& path : _state->generations_found | boost::adaptors::map_values) { if (flags.throw_on_missing_toc) { - throw sstables::malformed_sstable_exception(format("At directory: {}: no TOC found for SSTable {}!. Refusing to boot", _sstable_dir.native(), path.native())); + throw sstables::malformed_sstable_exception(format("At directory: {}: no TOC found for SSTable {}!. Refusing to boot", location.native(), path.native())); } else { - dirlog.info("Found incomplete SSTable {} at directory {}. Removing", path.native(), _sstable_dir.native()); - _files_for_removal.insert(path.native()); + dirlog.info("Found incomplete SSTable {} at directory {}. Removing", path.native(), location.native()); + _state->files_for_removal.insert(path.native()); } } } -future<> -sstable_directory::commit_directory_changes() { +future<> sstable_directory::commit_directory_changes() { + return _lister->commit().finally([x = std::move(_lister)] {}); +} + +future<> sstable_directory::components_lister::commit() { // Remove all files scheduled for removal - return parallel_for_each(std::exchange(_files_for_removal, {}), [] (sstring path) { + return parallel_for_each(std::exchange(_state->files_for_removal, {}), [] (sstring path) { dirlog.info("Removing file {}", path); return remove_file(std::move(path)); }); diff --git a/sstables/sstable_directory.hh b/sstables/sstable_directory.hh index 4447cf063a..eed7af55bf 100644 --- a/sstables/sstable_directory.hh +++ b/sstables/sstable_directory.hh @@ -65,29 +65,38 @@ public: }; class components_lister { + struct scan_state { + using scan_multimap = std::unordered_multimap; + using scan_descriptors = utils::chunked_vector; + using scan_descriptors_map = std::unordered_map; + + scan_multimap generations_found; + scan_descriptors temp_toc_found; + scan_descriptors_map descriptors; + + // SSTable files to be deleted: things with a Temporary TOC, missing TOC files, + // TemporaryStatistics, etc. Not part of the scan state, because we want to do a 2-phase + // delete: maybe one of the shards will have signaled an error. And in the case of an error + // we don't want to delete anything. + std::unordered_set files_for_removal; + }; + + void handle(sstables::entry_descriptor desc, std::filesystem::path filename); + directory_lister _lister; - public: + std::unique_ptr _state; + future get(); - components_lister(std::filesystem::path dir); future<> close(); + + public: + components_lister(std::filesystem::path dir); + + future<> process(sstable_directory& directory, fs::path location, process_flags flags); + future<> commit(); }; private: - using scan_multimap = std::unordered_multimap; - using scan_descriptors = utils::chunked_vector; - using scan_descriptors_map = std::unordered_map; - - struct scan_state { - scan_multimap generations_found; - scan_descriptors temp_toc_found; - scan_descriptors_map descriptors; - }; - - // SSTable files to be deleted: things with a Temporary TOC, missing TOC files, - // TemporaryStatistics, etc. Not part of the scan state, because we want to do a 2-phase - // delete: maybe one of the shards will have signaled an error. And in the case of an error - // we don't want to delete anything. - std::unordered_set _files_for_removal; // prevents an object that respects a phaser (usually a table) from disappearing in the middle of the operation. // Will be destroyed when this object is destroyed. @@ -98,6 +107,7 @@ private: std::filesystem::path _sstable_dir; ::io_priority_class _io_priority; io_error_handler_gen _error_handler_gen; + std::unique_ptr _lister; generation_type _max_generation_seen = generation_from_value(0); sstables::sstable_version_types _max_version_seen = sstables::sstable_version_types::ka; @@ -121,7 +131,6 @@ private: future<> process_descriptor(sstables::entry_descriptor desc, process_flags flags); void validate(sstables::shared_sstable sst, process_flags flags) const; - void handle_component(scan_state& state, sstables::entry_descriptor desc, std::filesystem::path filename); template future<> parallel_for_each_restricted(Container&& C, Func&& func); diff --git a/sstables/sstables_manager.cc b/sstables/sstables_manager.cc index 985906db0d..b0246efc12 100644 --- a/sstables/sstables_manager.cc +++ b/sstables/sstables_manager.cc @@ -107,8 +107,8 @@ future<> sstables_manager::close() { co_await _sstable_metadata_concurrency_sem.stop(); } -sstable_directory::components_lister sstables_manager::get_components_lister(std::filesystem::path dir) { - return sstable_directory::components_lister(std::move(dir)); +std::unique_ptr sstables_manager::get_components_lister(std::filesystem::path dir) { + return std::make_unique(std::move(dir)); } } // namespace sstables diff --git a/sstables/sstables_manager.hh b/sstables/sstables_manager.hh index 53c141b299..755fd39803 100644 --- a/sstables/sstables_manager.hh +++ b/sstables/sstables_manager.hh @@ -85,7 +85,7 @@ public: io_error_handler_gen error_handler_gen = default_io_error_handler_gen(), size_t buffer_size = default_sstable_buffer_size); - sstable_directory::components_lister get_components_lister(std::filesystem::path dir); + std::unique_ptr get_components_lister(std::filesystem::path dir); virtual sstable_writer_config configure_writer(sstring origin) const; const db::config& config() const { return _db_config; }