mirror of
https://github.com/scylladb/scylladb.git
synced 2026-06-04 22:13:19 +00:00
Merge 'Wrap sstable directory scan state in components_lister' from Pavel Emelyanov
The sstable_directory now combines two activities: * scans the list of files in /var/lib/data and generates sstable-s object from it * maintains the found sstable-s throughout necessary processing (populate/reshard/reshape) The former part is in fact storage-specific. If sstables are on a filesystem, then it should be scanned with listdir, there can be dangling files, like temp-TOC, pending deletion log and comonents not belonging to any TOCs. If sstables are on some other storage, then this part should work some other way. Said that, the sstable_directory is to be split into two pieces -- lister and "processing state". The latter would (may?) require renaming the sstable_directory into something more relevant, but that's huge and intrusive change. For now, just collect the lister stuff in one place. Closes #12843 * github.com:scylladb/scylladb: sstable_directory: Keep lister internals private sstable_directory: Move most of .commit_directory_changes() on lister sstable_directory: Remove temporary aliases sstable_directory: Move most of .process_sstable_dir() on lister sstable_directory: Move .handle_component() to components_lister sstable_directory: Keep files_for_removal on scan_state sstable_directory: Keep components_lister aboard sstable_directory: Keep scan_state on components_lister
This commit is contained in:
@@ -35,6 +35,7 @@ bool manifest_json_filter(const fs::path&, const directory_entry& entry) {
|
||||
|
||||
sstable_directory::components_lister::components_lister(std::filesystem::path dir)
|
||||
: _lister(dir, lister::dir_entry_types::of<directory_entry_type::regular>(), &manifest_json_filter)
|
||||
, _state(std::make_unique<scan_state>())
|
||||
{
|
||||
}
|
||||
|
||||
@@ -57,17 +58,17 @@ sstable_directory::sstable_directory(sstables_manager& manager,
|
||||
, _sstable_dir(std::move(sstable_dir))
|
||||
, _io_priority(std::move(io_prio))
|
||||
, _error_handler_gen(error_handler_gen)
|
||||
, _lister(_manager.get_components_lister(_sstable_dir))
|
||||
, _unshared_remote_sstables(smp::count)
|
||||
{}
|
||||
|
||||
void
|
||||
sstable_directory::handle_component(scan_state& state, sstables::entry_descriptor desc, fs::path filename) {
|
||||
void sstable_directory::components_lister::handle(sstables::entry_descriptor desc, fs::path filename) {
|
||||
if ((generation_value(desc.generation) % smp::count) != this_shard_id()) {
|
||||
return;
|
||||
}
|
||||
|
||||
dirlog.trace("for SSTable directory, scanning {}", filename);
|
||||
state.generations_found.emplace(desc.generation, filename);
|
||||
_state->generations_found.emplace(desc.generation, filename);
|
||||
|
||||
switch (desc.component) {
|
||||
case component_type::TemporaryStatistics:
|
||||
@@ -75,13 +76,13 @@ sstable_directory::handle_component(scan_state& state, sstables::entry_descripto
|
||||
// for instance on mutate_level. We should delete it - so we mark it for deletion
|
||||
// here, but just the component. The old statistics file should still be there
|
||||
// and we'll go with it.
|
||||
_files_for_removal.insert(filename.native());
|
||||
_state->files_for_removal.insert(filename.native());
|
||||
break;
|
||||
case component_type::TOC:
|
||||
state.descriptors.emplace(desc.generation, std::move(desc));
|
||||
_state->descriptors.emplace(desc.generation, std::move(desc));
|
||||
break;
|
||||
case component_type::TemporaryTOC:
|
||||
state.temp_toc_found.push_back(std::move(desc));
|
||||
_state->temp_toc_found.push_back(std::move(desc));
|
||||
break;
|
||||
default:
|
||||
// Do nothing, and will validate when trying to load the file.
|
||||
@@ -164,10 +165,12 @@ sstable_directory::highest_version_seen() const {
|
||||
return _max_version_seen;
|
||||
}
|
||||
|
||||
future<>
|
||||
sstable_directory::process_sstable_dir(process_flags flags) {
|
||||
future<> sstable_directory::process_sstable_dir(process_flags flags) {
|
||||
dirlog.debug("Start processing directory {} for SSTables", _sstable_dir);
|
||||
return _lister->process(*this, _sstable_dir, flags);
|
||||
}
|
||||
|
||||
future<> sstable_directory::components_lister::process(sstable_directory& directory, fs::path location, process_flags flags) {
|
||||
// It seems wasteful that each shard is repeating this scan, and to some extent it is.
|
||||
// However, we still want to open the files and especially call process_dir() in a distributed
|
||||
// fashion not to overload any shard. Also in the common case the SSTables will all be
|
||||
@@ -180,25 +183,22 @@ sstable_directory::process_sstable_dir(process_flags flags) {
|
||||
// - If all shards scan in parallel, they can start loading sooner. That is faster than having
|
||||
// a separate step to fetch all files, followed by another step to distribute and process.
|
||||
|
||||
scan_state state;
|
||||
|
||||
auto sstable_dir_lister = _manager.get_components_lister(_sstable_dir);
|
||||
std::exception_ptr ex;
|
||||
try {
|
||||
while (true) {
|
||||
sstring name = co_await sstable_dir_lister.get();
|
||||
sstring name = co_await get();
|
||||
if (name == "") {
|
||||
break;
|
||||
}
|
||||
auto comps = sstables::entry_descriptor::make_descriptor(_sstable_dir.native(), name);
|
||||
handle_component(state, std::move(comps), _sstable_dir / name);
|
||||
auto comps = sstables::entry_descriptor::make_descriptor(location.native(), name);
|
||||
handle(std::move(comps), location / name);
|
||||
}
|
||||
} catch (...) {
|
||||
ex = std::current_exception();
|
||||
}
|
||||
co_await sstable_dir_lister.close();
|
||||
co_await close();
|
||||
if (ex) {
|
||||
dirlog.debug("Could not process sstable directory {}: {}", _sstable_dir, ex);
|
||||
dirlog.debug("Could not process sstable directory {}: {}", location, ex);
|
||||
// FIXME: waiting for https://github.com/scylladb/seastar/pull/1090
|
||||
// co_await coroutine::return_exception(std::move(ex));
|
||||
std::rethrow_exception(std::move(ex));
|
||||
@@ -207,51 +207,54 @@ sstable_directory::process_sstable_dir(process_flags flags) {
|
||||
// Always okay to delete files with a temporary TOC. We want to do it before we process
|
||||
// the generations seen: it's okay to reuse those generations since the files will have
|
||||
// been deleted anyway.
|
||||
for (auto& desc: state.temp_toc_found) {
|
||||
auto range = state.generations_found.equal_range(desc.generation);
|
||||
for (auto& desc: _state->temp_toc_found) {
|
||||
auto range = _state->generations_found.equal_range(desc.generation);
|
||||
for (auto it = range.first; it != range.second; ++it) {
|
||||
auto& path = it->second;
|
||||
dirlog.trace("Scheduling to remove file {}, from an SSTable with a Temporary TOC", path.native());
|
||||
_files_for_removal.insert(path.native());
|
||||
_state->files_for_removal.insert(path.native());
|
||||
}
|
||||
state.generations_found.erase(range.first, range.second);
|
||||
state.descriptors.erase(desc.generation);
|
||||
_state->generations_found.erase(range.first, range.second);
|
||||
_state->descriptors.erase(desc.generation);
|
||||
}
|
||||
|
||||
_max_generation_seen = boost::accumulate(state.generations_found | boost::adaptors::map_keys, generation_from_value(0), [] (generation_type a, generation_type b) {
|
||||
directory._max_generation_seen = boost::accumulate(_state->generations_found | boost::adaptors::map_keys, generation_from_value(0), [] (generation_type a, generation_type b) {
|
||||
return std::max<generation_type>(a, b);
|
||||
});
|
||||
|
||||
dirlog.debug("After {} scanned, seen generation {}. {} descriptors found, {} different files found ",
|
||||
_sstable_dir, _max_generation_seen, state.descriptors.size(), state.generations_found.size());
|
||||
location, directory._max_generation_seen, _state->descriptors.size(), _state->generations_found.size());
|
||||
|
||||
// _descriptors is everything with a TOC. So after we remove this, what's left is
|
||||
// SSTables for which a TOC was not found.
|
||||
co_await parallel_for_each_restricted(state.descriptors, [this, flags, &state] (std::tuple<generation_type, sstables::entry_descriptor>&& t) {
|
||||
co_await directory.parallel_for_each_restricted(_state->descriptors, [this, flags, &directory] (std::tuple<generation_type, sstables::entry_descriptor>&& t) {
|
||||
auto& desc = std::get<1>(t);
|
||||
state.generations_found.erase(desc.generation);
|
||||
_state->generations_found.erase(desc.generation);
|
||||
// This will try to pre-load this file and throw an exception if it is invalid
|
||||
return process_descriptor(std::move(desc), flags);
|
||||
return directory.process_descriptor(std::move(desc), flags);
|
||||
});
|
||||
|
||||
// For files missing TOC, it depends on where this is coming from.
|
||||
// If scylla was supposed to have generated this SSTable, this is not okay and
|
||||
// we refuse to proceed. If this coming from, say, an import, then we just delete,
|
||||
// log and proceed.
|
||||
for (auto& path : state.generations_found | boost::adaptors::map_values) {
|
||||
for (auto& path : _state->generations_found | boost::adaptors::map_values) {
|
||||
if (flags.throw_on_missing_toc) {
|
||||
throw sstables::malformed_sstable_exception(format("At directory: {}: no TOC found for SSTable {}!. Refusing to boot", _sstable_dir.native(), path.native()));
|
||||
throw sstables::malformed_sstable_exception(format("At directory: {}: no TOC found for SSTable {}!. Refusing to boot", location.native(), path.native()));
|
||||
} else {
|
||||
dirlog.info("Found incomplete SSTable {} at directory {}. Removing", path.native(), _sstable_dir.native());
|
||||
_files_for_removal.insert(path.native());
|
||||
dirlog.info("Found incomplete SSTable {} at directory {}. Removing", path.native(), location.native());
|
||||
_state->files_for_removal.insert(path.native());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
future<>
|
||||
sstable_directory::commit_directory_changes() {
|
||||
future<> sstable_directory::commit_directory_changes() {
|
||||
return _lister->commit().finally([x = std::move(_lister)] {});
|
||||
}
|
||||
|
||||
future<> sstable_directory::components_lister::commit() {
|
||||
// Remove all files scheduled for removal
|
||||
return parallel_for_each(std::exchange(_files_for_removal, {}), [] (sstring path) {
|
||||
return parallel_for_each(std::exchange(_state->files_for_removal, {}), [] (sstring path) {
|
||||
dirlog.info("Removing file {}", path);
|
||||
return remove_file(std::move(path));
|
||||
});
|
||||
|
||||
@@ -65,29 +65,38 @@ public:
|
||||
};
|
||||
|
||||
class components_lister {
|
||||
struct scan_state {
|
||||
using scan_multimap = std::unordered_multimap<generation_type, std::filesystem::path>;
|
||||
using scan_descriptors = utils::chunked_vector<sstables::entry_descriptor>;
|
||||
using scan_descriptors_map = std::unordered_map<generation_type, sstables::entry_descriptor>;
|
||||
|
||||
scan_multimap generations_found;
|
||||
scan_descriptors temp_toc_found;
|
||||
scan_descriptors_map descriptors;
|
||||
|
||||
// SSTable files to be deleted: things with a Temporary TOC, missing TOC files,
|
||||
// TemporaryStatistics, etc. Not part of the scan state, because we want to do a 2-phase
|
||||
// delete: maybe one of the shards will have signaled an error. And in the case of an error
|
||||
// we don't want to delete anything.
|
||||
std::unordered_set<sstring> files_for_removal;
|
||||
};
|
||||
|
||||
void handle(sstables::entry_descriptor desc, std::filesystem::path filename);
|
||||
|
||||
directory_lister _lister;
|
||||
public:
|
||||
std::unique_ptr<scan_state> _state;
|
||||
|
||||
future<sstring> get();
|
||||
components_lister(std::filesystem::path dir);
|
||||
future<> close();
|
||||
|
||||
public:
|
||||
components_lister(std::filesystem::path dir);
|
||||
|
||||
future<> process(sstable_directory& directory, fs::path location, process_flags flags);
|
||||
future<> commit();
|
||||
};
|
||||
|
||||
private:
|
||||
using scan_multimap = std::unordered_multimap<generation_type, std::filesystem::path>;
|
||||
using scan_descriptors = utils::chunked_vector<sstables::entry_descriptor>;
|
||||
using scan_descriptors_map = std::unordered_map<generation_type, sstables::entry_descriptor>;
|
||||
|
||||
struct scan_state {
|
||||
scan_multimap generations_found;
|
||||
scan_descriptors temp_toc_found;
|
||||
scan_descriptors_map descriptors;
|
||||
};
|
||||
|
||||
// SSTable files to be deleted: things with a Temporary TOC, missing TOC files,
|
||||
// TemporaryStatistics, etc. Not part of the scan state, because we want to do a 2-phase
|
||||
// delete: maybe one of the shards will have signaled an error. And in the case of an error
|
||||
// we don't want to delete anything.
|
||||
std::unordered_set<sstring> _files_for_removal;
|
||||
|
||||
// prevents an object that respects a phaser (usually a table) from disappearing in the middle of the operation.
|
||||
// Will be destroyed when this object is destroyed.
|
||||
@@ -98,6 +107,7 @@ private:
|
||||
std::filesystem::path _sstable_dir;
|
||||
::io_priority_class _io_priority;
|
||||
io_error_handler_gen _error_handler_gen;
|
||||
std::unique_ptr<components_lister> _lister;
|
||||
|
||||
generation_type _max_generation_seen = generation_from_value(0);
|
||||
sstables::sstable_version_types _max_version_seen = sstables::sstable_version_types::ka;
|
||||
@@ -121,7 +131,6 @@ private:
|
||||
|
||||
future<> process_descriptor(sstables::entry_descriptor desc, process_flags flags);
|
||||
void validate(sstables::shared_sstable sst, process_flags flags) const;
|
||||
void handle_component(scan_state& state, sstables::entry_descriptor desc, std::filesystem::path filename);
|
||||
|
||||
template <typename Container, typename Func>
|
||||
future<> parallel_for_each_restricted(Container&& C, Func&& func);
|
||||
|
||||
@@ -107,8 +107,8 @@ future<> sstables_manager::close() {
|
||||
co_await _sstable_metadata_concurrency_sem.stop();
|
||||
}
|
||||
|
||||
sstable_directory::components_lister sstables_manager::get_components_lister(std::filesystem::path dir) {
|
||||
return sstable_directory::components_lister(std::move(dir));
|
||||
std::unique_ptr<sstable_directory::components_lister> sstables_manager::get_components_lister(std::filesystem::path dir) {
|
||||
return std::make_unique<sstable_directory::components_lister>(std::move(dir));
|
||||
}
|
||||
|
||||
} // namespace sstables
|
||||
|
||||
@@ -85,7 +85,7 @@ public:
|
||||
io_error_handler_gen error_handler_gen = default_io_error_handler_gen(),
|
||||
size_t buffer_size = default_sstable_buffer_size);
|
||||
|
||||
sstable_directory::components_lister get_components_lister(std::filesystem::path dir);
|
||||
std::unique_ptr<sstable_directory::components_lister> get_components_lister(std::filesystem::path dir);
|
||||
|
||||
virtual sstable_writer_config configure_writer(sstring origin) const;
|
||||
const db::config& config() const { return _db_config; }
|
||||
|
||||
Reference in New Issue
Block a user