/* * Copyright (C) 2020-present ScyllaDB */ /* * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 */ #include #include "sstables/random_access_reader.hh" #include "utils/disk-error-handler.hh" #include "utils/log.hh" namespace sstables { extern logging::logger sstlog; future > random_access_reader::read_exactly(size_t n) noexcept { try { return _in->read_exactly(n); } catch (...) { return current_exception_as_future>(); } } static future<> close_if_needed(std::unique_ptr> in) { if (!in) { return make_ready_future<>(); } return in->close().finally([in = std::move(in)] {}); } future<> random_access_reader::seek(uint64_t pos) noexcept { try { auto tmp = std::make_unique>(open_at(pos)); std::swap(tmp, _in); return close_if_needed(std::move(tmp)); } catch (...) { return current_exception_as_future(); } } future<> random_access_reader::close() noexcept { return futurize_invoke(close_if_needed, std::move(_in)); } file_random_access_reader::file_random_access_reader(file f, uint64_t file_size, size_t buffer_size, unsigned read_ahead) : _file(std::move(f)), _buffer_size(buffer_size), _read_ahead(read_ahead), _file_size(file_size) { set(open_at(0)); } input_stream file_random_access_reader::open_at(uint64_t pos) { auto len = _file_size - pos; file_input_stream_options options; options.buffer_size = _buffer_size; options.read_ahead = _read_ahead; return make_file_input_stream(_file, pos, len, std::move(options)); } future<> file_random_access_reader::close() noexcept { return random_access_reader::close().finally([this] { return _file.close().handle_exception([save = _file](auto ep) { sstlog.warn("sstable close failed: {}", ep); general_disk_error(); }); }); } digest_file_random_access_reader::digest_file_random_access_reader(file f, uint64_t file_size, size_t buffer_size, unsigned read_ahead) : file_random_access_reader(std::move(f), file_size, buffer_size, read_ahead) { _digest = crc32_utils::init_checksum(); } future> digest_file_random_access_reader::read_exactly(size_t n) noexcept { return random_access_reader::read_exactly(n).then([this] (temporary_buffer buf) { _pos += buf.size(); if (_digest) { uint32_t per_chunk_checksum = crc32_utils::init_checksum(); per_chunk_checksum = crc32_utils::checksum(per_chunk_checksum, buf.begin(), buf.size()); _digest = checksum_combine_or_feed(*_digest, per_chunk_checksum, buf.begin(), buf.size()); } return buf; }); } future<> digest_file_random_access_reader::seek(uint64_t pos) noexcept { _pos = pos; return random_access_reader::seek(pos).then([this] { _digest.reset(); }); } }