diff --git a/sstables/sstables.cc b/sstables/sstables.cc index 884d860863..f5d6874902 100644 --- a/sstables/sstables.cc +++ b/sstables/sstables.cc @@ -875,9 +875,7 @@ future<> sstable::read_toc() noexcept { return make_ready_future<>(); } - sstlog.debug("Reading TOC file {}", filename(component_type::TOC)); - - return with_file(new_sstable_component_file(_read_error_handler, component_type::TOC, open_flags::ro), [this] (file f) { + return do_read_simple(component_type::TOC, [&] (version_types v, file f) -> future<> { auto bufptr = allocate_aligned_buffer(4096, 4096); auto buf = bufptr.get(); @@ -1083,26 +1081,15 @@ void sstable::write_digest(uint32_t full_checksum) { thread_local std::array, downsampling::BASE_SAMPLING_LEVEL> downsampling::_sample_pattern_cache; thread_local std::array, downsampling::BASE_SAMPLING_LEVEL> downsampling::_original_index_cache; - -template -future<> sstable::read_simple(T& component, const io_priority_class& pc) { - - auto file_path = filename(Type); - sstlog.debug(("Reading " + sstable_version_constants::get_component_map(_version).at(Type) + " file {} ").c_str(), file_path); +future<> sstable::do_read_simple(component_type type, + noncopyable_function (version_types, file&&, uint64_t sz)> read_component) { + auto file_path = filename(type); + sstlog.debug(("Reading " + sstable_version_constants::get_component_map(_version).at(type) + " file {} ").c_str(), file_path); try { - file fi = co_await new_sstable_component_file(_read_error_handler, Type, open_flags::ro); + file fi = co_await new_sstable_component_file(_read_error_handler, type, open_flags::ro); uint64_t size = co_await fi.size(); - auto r = file_random_access_reader(std::move(fi), size, sstable_buffer_size); - std::exception_ptr ex; - try { - co_await parse(*_schema, _version, r, component); - } catch (...) { - ex = std::current_exception(); - } - co_await r.close(); - - maybe_rethrow_exception(std::move(ex)); + co_await read_component(_version, std::move(fi), size); } catch (std::system_error& e) { if (e.code() == std::error_code(ENOENT, std::system_category())) { throw malformed_sstable_exception(file_path + ": file not found"); @@ -1113,6 +1100,37 @@ future<> sstable::read_simple(T& component, const io_priority_class& pc) { } } +future<> sstable::do_read_simple(component_type type, + noncopyable_function (version_types, file)> read_component) { + return do_read_simple(type, [this, read_component = std::move(read_component)] (version_types v, file&& f, uint64_t) -> future<> { + std::exception_ptr ex; + try { + co_await read_component(_version, f); + } catch (...) { + ex = std::current_exception(); + } + co_await f.close(); + + maybe_rethrow_exception(std::move(ex)); + }); +} + +template +future<> sstable::read_simple(T& component, const io_priority_class& pc) { + return do_read_simple(Type, [&] (version_types v, file&& f, uint64_t size) -> future<> { + std::exception_ptr ex; + auto r = file_random_access_reader(std::move(f), size, sstable_buffer_size); + try { + co_await parse(*_schema, v, r, component); + } catch (...) { + ex = std::current_exception(); + } + co_await r.close(); + + maybe_rethrow_exception(std::move(ex)); + }); +} + void sstable::do_write_simple(file_writer&& writer, noncopyable_function write_component) { write_component(_version, writer); @@ -2632,16 +2650,19 @@ static future do_validate_uncompressed(input_stream& stream, const c } future sstable::read_digest(io_priority_class pc) { + sstring digest_str; + + /// FIXME: restore indentation + co_await do_read_simple(component_type::Digest, [&] (version_types v, file digest_file) -> future<> { + file_input_stream_options options; options.buffer_size = 4096; options.io_priority_class = pc; - auto digest_file = co_await open_file_dma(filename(component_type::Digest), open_flags::ro); auto digest_stream = make_file_input_stream(std::move(digest_file), options); std::exception_ptr ex; - sstring digest_str; try { digest_str = co_await util::read_entire_stream_contiguous(digest_stream); } catch (...) { @@ -2651,18 +2672,23 @@ future sstable::read_digest(io_priority_class pc) { co_await digest_stream.close(); maybe_rethrow_exception(std::move(ex)); + }); + co_return boost::lexical_cast(digest_str); } future sstable::read_checksum(io_priority_class pc) { + sstables::checksum checksum; + + // FIXME: restore indentation + co_await do_read_simple(component_type::CRC, [&] (version_types v, file crc_file) -> future<> { + file_input_stream_options options; options.buffer_size = 4096; options.io_priority_class = pc; - auto crc_file = co_await open_file_dma(filename(component_type::CRC), open_flags::ro); auto crc_stream = make_file_input_stream(std::move(crc_file), options); - checksum checksum; std::exception_ptr ex; try { @@ -2685,6 +2711,8 @@ future sstable::read_checksum(io_priority_class pc) { co_await crc_stream.close(); maybe_rethrow_exception(std::move(ex)); + }); + co_return checksum; } diff --git a/sstables/sstables.hh b/sstables/sstables.hh index 7450c626cd..2d078b8545 100644 --- a/sstables/sstables.hh +++ b/sstables/sstables.hh @@ -54,6 +54,8 @@ class large_data_handler; namespace sstables { +class random_access_reader; + class sstable_directory; extern thread_local utils::updateable_value global_cache_index_pages; @@ -596,6 +598,11 @@ private: template future<> read_simple(T& comp, const io_priority_class& pc); + future<> do_read_simple(component_type type, + noncopyable_function (version_types, file&&, uint64_t sz)> read_component); + // this variant closes the file on parse completion + future<> do_read_simple(component_type type, + noncopyable_function (version_types, file)> read_component); template void write_simple(const T& comp, const io_priority_class& pc);