sstable: make all parsing of simple components go through do_read_simple()
With all parsing of simple components going through do_read_simple(), common infrastructure can be reused (exception handling, debug logging, etc), and also statistics spanning all components can be easily added. Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
This commit is contained in:
@@ -875,9 +875,7 @@ future<> sstable::read_toc() noexcept {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
sstlog.debug("Reading TOC file {}", filename(component_type::TOC));
|
||||
|
||||
return with_file(new_sstable_component_file(_read_error_handler, component_type::TOC, open_flags::ro), [this] (file f) {
|
||||
return do_read_simple(component_type::TOC, [&] (version_types v, file f) -> future<> {
|
||||
auto bufptr = allocate_aligned_buffer<char>(4096, 4096);
|
||||
auto buf = bufptr.get();
|
||||
|
||||
@@ -1083,26 +1081,15 @@ void sstable::write_digest(uint32_t full_checksum) {
|
||||
thread_local std::array<std::vector<int>, downsampling::BASE_SAMPLING_LEVEL> downsampling::_sample_pattern_cache;
|
||||
thread_local std::array<std::vector<int>, downsampling::BASE_SAMPLING_LEVEL> downsampling::_original_index_cache;
|
||||
|
||||
|
||||
template <component_type Type, typename T>
|
||||
future<> sstable::read_simple(T& component, const io_priority_class& pc) {
|
||||
|
||||
auto file_path = filename(Type);
|
||||
sstlog.debug(("Reading " + sstable_version_constants::get_component_map(_version).at(Type) + " file {} ").c_str(), file_path);
|
||||
future<> sstable::do_read_simple(component_type type,
|
||||
noncopyable_function<future<> (version_types, file&&, uint64_t sz)> read_component) {
|
||||
auto file_path = filename(type);
|
||||
sstlog.debug(("Reading " + sstable_version_constants::get_component_map(_version).at(type) + " file {} ").c_str(), file_path);
|
||||
try {
|
||||
file fi = co_await new_sstable_component_file(_read_error_handler, Type, open_flags::ro);
|
||||
file fi = co_await new_sstable_component_file(_read_error_handler, type, open_flags::ro);
|
||||
uint64_t size = co_await fi.size();
|
||||
|
||||
auto r = file_random_access_reader(std::move(fi), size, sstable_buffer_size);
|
||||
std::exception_ptr ex;
|
||||
try {
|
||||
co_await parse(*_schema, _version, r, component);
|
||||
} catch (...) {
|
||||
ex = std::current_exception();
|
||||
}
|
||||
co_await r.close();
|
||||
|
||||
maybe_rethrow_exception(std::move(ex));
|
||||
co_await read_component(_version, std::move(fi), size);
|
||||
} catch (std::system_error& e) {
|
||||
if (e.code() == std::error_code(ENOENT, std::system_category())) {
|
||||
throw malformed_sstable_exception(file_path + ": file not found");
|
||||
@@ -1113,6 +1100,37 @@ future<> sstable::read_simple(T& component, const io_priority_class& pc) {
|
||||
}
|
||||
}
|
||||
|
||||
future<> sstable::do_read_simple(component_type type,
|
||||
noncopyable_function<future<> (version_types, file)> read_component) {
|
||||
return do_read_simple(type, [this, read_component = std::move(read_component)] (version_types v, file&& f, uint64_t) -> future<> {
|
||||
std::exception_ptr ex;
|
||||
try {
|
||||
co_await read_component(_version, f);
|
||||
} catch (...) {
|
||||
ex = std::current_exception();
|
||||
}
|
||||
co_await f.close();
|
||||
|
||||
maybe_rethrow_exception(std::move(ex));
|
||||
});
|
||||
}
|
||||
|
||||
template <component_type Type, typename T>
|
||||
future<> sstable::read_simple(T& component, const io_priority_class& pc) {
|
||||
return do_read_simple(Type, [&] (version_types v, file&& f, uint64_t size) -> future<> {
|
||||
std::exception_ptr ex;
|
||||
auto r = file_random_access_reader(std::move(f), size, sstable_buffer_size);
|
||||
try {
|
||||
co_await parse(*_schema, v, r, component);
|
||||
} catch (...) {
|
||||
ex = std::current_exception();
|
||||
}
|
||||
co_await r.close();
|
||||
|
||||
maybe_rethrow_exception(std::move(ex));
|
||||
});
|
||||
}
|
||||
|
||||
void sstable::do_write_simple(file_writer&& writer,
|
||||
noncopyable_function<void (version_types, file_writer&)> write_component) {
|
||||
write_component(_version, writer);
|
||||
@@ -2632,16 +2650,19 @@ static future<bool> do_validate_uncompressed(input_stream<char>& stream, const c
|
||||
}
|
||||
|
||||
future<uint32_t> sstable::read_digest(io_priority_class pc) {
|
||||
sstring digest_str;
|
||||
|
||||
/// FIXME: restore indentation
|
||||
co_await do_read_simple(component_type::Digest, [&] (version_types v, file digest_file) -> future<> {
|
||||
|
||||
file_input_stream_options options;
|
||||
options.buffer_size = 4096;
|
||||
options.io_priority_class = pc;
|
||||
|
||||
auto digest_file = co_await open_file_dma(filename(component_type::Digest), open_flags::ro);
|
||||
auto digest_stream = make_file_input_stream(std::move(digest_file), options);
|
||||
|
||||
std::exception_ptr ex;
|
||||
|
||||
sstring digest_str;
|
||||
try {
|
||||
digest_str = co_await util::read_entire_stream_contiguous(digest_stream);
|
||||
} catch (...) {
|
||||
@@ -2651,18 +2672,23 @@ future<uint32_t> sstable::read_digest(io_priority_class pc) {
|
||||
co_await digest_stream.close();
|
||||
maybe_rethrow_exception(std::move(ex));
|
||||
|
||||
});
|
||||
|
||||
co_return boost::lexical_cast<uint32_t>(digest_str);
|
||||
}
|
||||
|
||||
future<checksum> sstable::read_checksum(io_priority_class pc) {
|
||||
sstables::checksum checksum;
|
||||
|
||||
// FIXME: restore indentation
|
||||
co_await do_read_simple(component_type::CRC, [&] (version_types v, file crc_file) -> future<> {
|
||||
|
||||
file_input_stream_options options;
|
||||
options.buffer_size = 4096;
|
||||
options.io_priority_class = pc;
|
||||
|
||||
auto crc_file = co_await open_file_dma(filename(component_type::CRC), open_flags::ro);
|
||||
auto crc_stream = make_file_input_stream(std::move(crc_file), options);
|
||||
|
||||
checksum checksum;
|
||||
std::exception_ptr ex;
|
||||
|
||||
try {
|
||||
@@ -2685,6 +2711,8 @@ future<checksum> sstable::read_checksum(io_priority_class pc) {
|
||||
co_await crc_stream.close();
|
||||
maybe_rethrow_exception(std::move(ex));
|
||||
|
||||
});
|
||||
|
||||
co_return checksum;
|
||||
}
|
||||
|
||||
|
||||
@@ -54,6 +54,8 @@ class large_data_handler;
|
||||
|
||||
namespace sstables {
|
||||
|
||||
class random_access_reader;
|
||||
|
||||
class sstable_directory;
|
||||
extern thread_local utils::updateable_value<bool> global_cache_index_pages;
|
||||
|
||||
@@ -596,6 +598,11 @@ private:
|
||||
|
||||
template <component_type Type, typename T>
|
||||
future<> read_simple(T& comp, const io_priority_class& pc);
|
||||
future<> do_read_simple(component_type type,
|
||||
noncopyable_function<future<> (version_types, file&&, uint64_t sz)> read_component);
|
||||
// this variant closes the file on parse completion
|
||||
future<> do_read_simple(component_type type,
|
||||
noncopyable_function<future<> (version_types, file)> read_component);
|
||||
|
||||
template <component_type Type, typename T>
|
||||
void write_simple(const T& comp, const io_priority_class& pc);
|
||||
|
||||
Reference in New Issue
Block a user