diff --git a/configure.py b/configure.py index 7fc76a3962..4432cc764c 100755 --- a/configure.py +++ b/configure.py @@ -278,6 +278,7 @@ scylla_tests = set([ 'test/boost/broken_sstable_test', 'test/boost/bytes_ostream_test', 'test/boost/cache_flat_mutation_reader_test', + 'test/boost/cached_file_test', 'test/boost/caching_options_test', 'test/boost/canonical_mutation_test', 'test/boost/cartesian_product_test', diff --git a/test/boost/cached_file_test.cc b/test/boost/cached_file_test.cc new file mode 100644 index 0000000000..945243a32c --- /dev/null +++ b/test/boost/cached_file_test.cc @@ -0,0 +1,326 @@ +/* + * Copyright (C) 2020 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#include +#include +#include +#include +#include +#include +#include + +#include "test/lib/random_utils.hh" +#include "test/lib/log.hh" +#include "test/lib/tmpdir.hh" +#include "test/lib/reader_permit.hh" + +#include "utils/cached_file.hh" + +using namespace seastar; + +static sstring read_to_string(cached_file::stream& s, size_t limit = std::numeric_limits::max()) { + sstring b; + while (auto buf = s.next().get0()) { + b += sstring(buf.get(), buf.size()); + if (b.size() >= limit) { + break; + } + } + return b.substr(0, limit); +} + +static sstring read_to_string(cached_file& cf, size_t off, size_t limit = std::numeric_limits::max()) { + auto s = cf.read(off, default_priority_class()); + return read_to_string(s, limit); +} + +struct test_file { + tmpdir dir; + file f; + sstring contents; + + ~test_file() { + f.close().get(); + } +}; + +test_file make_test_file(size_t size) { + tmpdir dir; + auto contents = tests::random::get_sstring(size); + + auto path = dir.path() / "file"; + file f = open_file_dma(path.c_str(), open_flags::create | open_flags::rw).get0(); + + testlog.debug("file contents: {}", contents); + + output_stream out = make_file_output_stream(f).get0(); + auto close_out = defer([&] { out.close().get(); }); + out.write(contents.begin(), contents.size()).get(); + out.flush().get(); + + f = open_file_dma(path.c_str(), open_flags::ro).get0(); + + return test_file{ + .dir = std::move(dir), + .f = std::move(f), + .contents = std::move(contents) + }; +} + +SEASTAR_THREAD_TEST_CASE(test_reading_from_small_file) { + test_file tf = make_test_file(1024); + + { + cached_file::metrics metrics; + cached_file cf(tf.f, tests::make_permit(), metrics, 0, tf.contents.size()); + + { + BOOST_REQUIRE_EQUAL(tf.contents, read_to_string(cf, 0)); + + BOOST_REQUIRE_EQUAL(1024, metrics.cached_bytes); + BOOST_REQUIRE_EQUAL(1, metrics.page_misses); + BOOST_REQUIRE_EQUAL(0, metrics.page_evictions); + BOOST_REQUIRE_EQUAL(0, metrics.page_hits); + BOOST_REQUIRE_EQUAL(1, metrics.page_populations); + } + + { + BOOST_REQUIRE_EQUAL(tf.contents.substr(2), read_to_string(cf, 2)); + + BOOST_REQUIRE_EQUAL(1024, metrics.cached_bytes); + BOOST_REQUIRE_EQUAL(1, metrics.page_misses); + BOOST_REQUIRE_EQUAL(0, metrics.page_evictions); + BOOST_REQUIRE_EQUAL(1, metrics.page_hits); // change here + BOOST_REQUIRE_EQUAL(1, metrics.page_populations); + } + + { + BOOST_REQUIRE_EQUAL(sstring(), read_to_string(cf, 3000)); + + // no change + BOOST_REQUIRE_EQUAL(1024, metrics.cached_bytes); + BOOST_REQUIRE_EQUAL(1, metrics.page_misses); + BOOST_REQUIRE_EQUAL(0, metrics.page_evictions); + BOOST_REQUIRE_EQUAL(1, metrics.page_hits); + BOOST_REQUIRE_EQUAL(1, metrics.page_populations); + } + } + + { + size_t off = 100; + cached_file::metrics metrics; + cached_file cf(tf.f, tests::make_permit(), metrics, off, tf.contents.size() - off); + + BOOST_REQUIRE_EQUAL(tf.contents.substr(off), read_to_string(cf, 0)); + BOOST_REQUIRE_EQUAL(tf.contents.substr(off + 2), read_to_string(cf, 2)); + BOOST_REQUIRE_EQUAL(sstring(), read_to_string(cf, 3000)); + } +} + +SEASTAR_THREAD_TEST_CASE(test_invalidation) { + auto page_size = cached_file::page_size; + test_file tf = make_test_file(page_size * 2); + + cached_file::metrics metrics; + cached_file cf(tf.f, tests::make_permit(), metrics, 0, page_size * 2); + + // Reads one page, half of the first page and half of the second page. + auto read = [&] { + BOOST_REQUIRE_EQUAL( + tf.contents.substr(page_size / 2, page_size), + read_to_string(cf, page_size / 2, page_size)); + }; + + read(); + BOOST_REQUIRE_EQUAL(2, metrics.page_populations); + BOOST_REQUIRE_EQUAL(2, metrics.page_misses); + + metrics = {}; + read(); + BOOST_REQUIRE_EQUAL(0, metrics.page_misses); + BOOST_REQUIRE_EQUAL(2, metrics.page_hits); + + metrics = {}; + cf.invalidate_at_most(0, page_size / 2); + BOOST_REQUIRE_EQUAL(0, metrics.page_evictions); + read(); + BOOST_REQUIRE_EQUAL(0, metrics.page_misses); + BOOST_REQUIRE_EQUAL(2, metrics.page_hits); + + metrics = {}; + cf.invalidate_at_most(0, page_size - 1); + BOOST_REQUIRE_EQUAL(0, metrics.page_evictions); + read(); + BOOST_REQUIRE_EQUAL(0, metrics.page_misses); + BOOST_REQUIRE_EQUAL(2, metrics.page_hits); + + metrics = {}; + cf.invalidate_at_most(0, page_size); + BOOST_REQUIRE_EQUAL(1, metrics.page_evictions); + read(); + BOOST_REQUIRE_EQUAL(1, metrics.page_misses); + BOOST_REQUIRE_EQUAL(1, metrics.page_populations); + BOOST_REQUIRE_EQUAL(1, metrics.page_hits); + + metrics = {}; + cf.invalidate_at_most(page_size, page_size + 1); + BOOST_REQUIRE_EQUAL(0, metrics.page_evictions); + read(); + BOOST_REQUIRE_EQUAL(0, metrics.page_misses); + BOOST_REQUIRE_EQUAL(2, metrics.page_hits); + + metrics = {}; + cf.invalidate_at_most(page_size, page_size + page_size); + BOOST_REQUIRE_EQUAL(1, metrics.page_evictions); + read(); + BOOST_REQUIRE_EQUAL(1, metrics.page_misses); + BOOST_REQUIRE_EQUAL(1, metrics.page_populations); + BOOST_REQUIRE_EQUAL(1, metrics.page_hits); + + metrics = {}; + cf.invalidate_at_most(0, page_size * 3); + BOOST_REQUIRE_EQUAL(2, metrics.page_evictions); + read(); + BOOST_REQUIRE_EQUAL(2, metrics.page_misses); + BOOST_REQUIRE_EQUAL(2, metrics.page_populations); + BOOST_REQUIRE_EQUAL(0, metrics.page_hits); + + metrics = {}; + cf.invalidate_at_most_front(0); + BOOST_REQUIRE_EQUAL(0, metrics.page_evictions); + + metrics = {}; + cf.invalidate_at_most_front(1); + BOOST_REQUIRE_EQUAL(0, metrics.page_evictions); + + metrics = {}; + cf.invalidate_at_most_front(page_size); + BOOST_REQUIRE_EQUAL(1, metrics.page_evictions); + + metrics = {}; + cf.invalidate_at_most_front(page_size * 2); + BOOST_REQUIRE_EQUAL(1, metrics.page_evictions); + + read(); + BOOST_REQUIRE_EQUAL(2, metrics.page_misses); + BOOST_REQUIRE_EQUAL(2, metrics.page_populations); + BOOST_REQUIRE_EQUAL(0, metrics.page_hits); +} + +SEASTAR_THREAD_TEST_CASE(test_invalidation_skewed_cached_file) { + auto page_size = cached_file::page_size; + test_file tf = make_test_file(page_size * 3); + + size_t offset = page_size / 2; + cached_file::metrics metrics; + cached_file cf(tf.f, tests::make_permit(), metrics, offset, page_size * 2); + + // Reads one page, half of the first page and half of the second page. + auto read = [&] { + BOOST_REQUIRE_EQUAL( + tf.contents.substr(offset, page_size), + read_to_string(cf, 0, page_size)); + }; + + read(); + BOOST_REQUIRE_EQUAL(2, metrics.page_populations); + BOOST_REQUIRE_EQUAL(2, metrics.page_misses); + + metrics = {}; + read(); + BOOST_REQUIRE_EQUAL(0, metrics.page_misses); + BOOST_REQUIRE_EQUAL(2, metrics.page_hits); + + metrics = {}; + cf.invalidate_at_most(0, 1); + BOOST_REQUIRE_EQUAL(0, metrics.page_evictions); + read(); + BOOST_REQUIRE_EQUAL(0, metrics.page_misses); + BOOST_REQUIRE_EQUAL(2, metrics.page_hits); + + metrics = {}; + cf.invalidate_at_most(1, page_size - align_up(offset, page_size) - 1); + BOOST_REQUIRE_EQUAL(0, metrics.page_evictions); + read(); + BOOST_REQUIRE_EQUAL(0, metrics.page_misses); + BOOST_REQUIRE_EQUAL(2, metrics.page_hits); + + metrics = {}; + cf.invalidate_at_most(0, page_size); + BOOST_REQUIRE_EQUAL(1, metrics.page_evictions); + read(); + BOOST_REQUIRE_EQUAL(1, metrics.page_misses); + BOOST_REQUIRE_EQUAL(1, metrics.page_populations); + BOOST_REQUIRE_EQUAL(1, metrics.page_hits); + + metrics = {}; + cf.invalidate_at_most(1, page_size); + BOOST_REQUIRE_EQUAL(0, metrics.page_evictions); + read(); + BOOST_REQUIRE_EQUAL(0, metrics.page_misses); + BOOST_REQUIRE_EQUAL(2, metrics.page_hits); + + metrics = {}; + cf.invalidate_at_most(1, align_up(offset, page_size) - offset + page_size); + BOOST_REQUIRE_EQUAL(1, metrics.page_evictions); + read(); + BOOST_REQUIRE_EQUAL(1, metrics.page_misses); + BOOST_REQUIRE_EQUAL(1, metrics.page_populations); + BOOST_REQUIRE_EQUAL(1, metrics.page_hits); + + metrics = {}; + cf.invalidate_at_most(1, align_up(offset, page_size) - offset + page_size - 1); + BOOST_REQUIRE_EQUAL(0, metrics.page_evictions); + read(); + BOOST_REQUIRE_EQUAL(0, metrics.page_misses); + BOOST_REQUIRE_EQUAL(0, metrics.page_populations); + BOOST_REQUIRE_EQUAL(2, metrics.page_hits); + + metrics = {}; + cf.invalidate_at_most(0, page_size * 3); + BOOST_REQUIRE_EQUAL(2, metrics.page_evictions); + read(); + BOOST_REQUIRE_EQUAL(2, metrics.page_misses); + BOOST_REQUIRE_EQUAL(2, metrics.page_populations); + BOOST_REQUIRE_EQUAL(0, metrics.page_hits); + + metrics = {}; + cf.invalidate_at_most_front(0); + BOOST_REQUIRE_EQUAL(0, metrics.page_evictions); + + metrics = {}; + cf.invalidate_at_most_front(1); + BOOST_REQUIRE_EQUAL(0, metrics.page_evictions); + + metrics = {}; + cf.invalidate_at_most_front(align_up(offset, page_size) - offset + page_size); + BOOST_REQUIRE_EQUAL(2, metrics.page_evictions); + read(); + + metrics = {}; + cf.invalidate_at_most_front(page_size * 3); + BOOST_REQUIRE_EQUAL(2, metrics.page_evictions); + + read(); + BOOST_REQUIRE_EQUAL(2, metrics.page_misses); + BOOST_REQUIRE_EQUAL(2, metrics.page_populations); + BOOST_REQUIRE_EQUAL(0, metrics.page_hits); +} diff --git a/utils/cached_file.hh b/utils/cached_file.hh new file mode 100644 index 0000000000..f60bdd2809 --- /dev/null +++ b/utils/cached_file.hh @@ -0,0 +1,261 @@ +/* + * Copyright (C) 2019 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#pragma once + +#include "reader_permit.hh" +#include "utils/div_ceil.hh" + +#include + +#include + +using namespace seastar; + +/// \brief A read-through cache of a file. +/// +/// Caches contents with page granularity (4 KiB). +/// Cached pages are evicted manually using the invalidate_*() method family, or when the object is destroyed. +/// +/// Concurrent reading is allowed. +/// +/// The object is movable but this is only allowed before readers are created. +/// +/// The cached_file can represent a subset of the file. The reason for this is so to satisfy +/// two requirements. One is that we have a page-aligned caching, where pages are aligned +/// relative to the start of the underlying file. This matches requirements of the seastar I/O engine +/// on I/O requests. +/// Another requirement is to have an effective way to populate the cache using an unaligned buffer +/// which starts in the middle of the file when we know that we won't need to access bytes located +/// before the buffer's position. See populate_front(). If we couldn't assume that, we wouldn't be +/// able to insert an unaligned buffer into the cache. +/// +class cached_file { +public: + // Must be aligned to _file.disk_read_dma_alignment(). 4K is always safe. + static constexpr size_t page_size = 4096; + + // The content of the underlying file (_file) is divided into pages + // of equal size (page_size). This type is used to identify pages. + // Pages are assigned consecutive identifiers starting from 0. + using page_idx_type = uint64_t; + + using offset_type = uint64_t; + + struct metrics { + uint64_t page_hits = 0; + uint64_t page_misses = 0; + uint64_t page_evictions = 0; + uint64_t page_populations = 0; + uint64_t cached_bytes = 0; + }; +private: + struct cached_page { + temporary_buffer buf; + explicit cached_page(temporary_buffer buf) : buf(std::move(buf)) {} + }; + + file _file; + reader_permit _permit; + metrics& _metrics; + + using cache_type = std::map; + cache_type _cache; + + const offset_type _start; + const offset_type _size; + + offset_type _last_page_size; // Ignores _start in case the start lies on the same page. + page_idx_type _last_page; +private: + future> get_page(page_idx_type idx, const io_priority_class& pc) { + auto i = _cache.lower_bound(idx); + if (i != _cache.end() && i->first == idx) { + ++_metrics.page_hits; + cached_page& cp = i->second; + return make_ready_future>(cp.buf.share()); + } + ++_metrics.page_misses; + auto size = idx == _last_page ? _last_page_size : page_size; + return _file.dma_read_exactly(idx * page_size, size, pc) + .then([this, idx] (temporary_buffer&& buf) mutable { + ++_metrics.page_populations; + _metrics.cached_bytes += buf.size(); + _cache.emplace(idx, cached_page(buf.share())); + return std::move(buf); + }); + } +public: + // Generator of subsequent pages of data reflecting the contents of the file. + // Single-user. + class stream { + cached_file* _cached_file; + const io_priority_class* _pc; + page_idx_type _page_idx; + offset_type _offset_in_page; + public: + // Creates an empty stream. + stream() + : _cached_file(nullptr) + , _pc(nullptr) + { } + + stream(cached_file& cf, const io_priority_class& pc, page_idx_type start_page, offset_type start_offset_in_page) + : _cached_file(&cf) + , _pc(&pc) + , _page_idx(start_page) + , _offset_in_page(start_offset_in_page) + + { } + + // Yields the next chunk of data. + // Returns empty buffer when end-of-stream is reached. + // Calls must be serialized. + // This instance must be kept alive until the returned future resolves. + future> next() { + if (!_cached_file || _page_idx > _cached_file->_last_page) { + return make_ready_future>(temporary_buffer()); + } + return _cached_file->get_page(_page_idx, *_pc).then([this] (temporary_buffer page) { + if (_page_idx == _cached_file->_last_page) { + page.trim(_cached_file->_last_page_size); + } + page.trim_front(_offset_in_page); + _offset_in_page = 0; + ++_page_idx; + return page; + }); + } + }; + + size_t evict_range(cache_type::iterator start, cache_type::iterator end) noexcept { + size_t count = 0; + while (start != end) { + ++count; + _metrics.cached_bytes -= start->second.buf.size(); + start = _cache.erase(start); + } + _metrics.page_evictions += count; + return count; + } +public: + /// \brief Constructs a cached_file. + /// + /// The cached area will reflect subset of f from the byte range [start, start + size). + /// + /// \param m Metrics object which should be updated from operations on this object. + /// The metrics object can be shared by many cached_file instances, in which case it + /// will reflect the sum of operations on all cached_file instances. + cached_file(file f, reader_permit permit, cached_file::metrics& m, offset_type start, offset_type size) + : _file(std::move(f)) + , _permit(std::move(permit)) + , _metrics(m) + , _start(start) + , _size(size) + { + offset_type last_byte_offset = _start + (_size ? (_size - 1) : 0); + _last_page_size = (last_byte_offset % page_size) + (_size ? 1 : 0); + _last_page = last_byte_offset / page_size; + } + + cached_file(cached_file&&) = default; + cached_file(const cached_file&) = delete; + + ~cached_file() { + evict_range(_cache.begin(), _cache.end()); + } + + /// \brief Populates cache from buf assuming that buf contains the data from the front of the area. + void populate_front(temporary_buffer buf) { + // Align to page start. We can do this because the junk before _start won't be accessed. + auto pad = _start % page_size; + auto idx = _start / page_size; + buf = temporary_buffer(buf.get_write() - pad, buf.size() + pad, buf.release()); + + while (buf.size() > page_size) { + auto page_buf = buf.share(); + page_buf.trim(page_size); + ++_metrics.page_populations; + _metrics.cached_bytes += page_buf.size(); + _cache.emplace(idx, cached_page(std::move(page_buf))); + buf.trim_front(page_size); + ++idx; + } + + if (buf.size() == page_size || (idx == _last_page && buf.size() >= _last_page_size)) { + ++_metrics.page_populations; + _metrics.cached_bytes += buf.size(); + _cache.emplace(idx, cached_page(std::move(buf))); + } + } + + /// \brief Invalidates [start, end) or less. + /// + /// Invariants: + /// + /// - all bytes outside [start, end) which were cached before the call will still be cached. + /// + void invalidate_at_most(offset_type start, offset_type end) { + auto lo_page = (_start + start) / page_size + // If start is 0 then we can drop the containing page + // even if _start is not aligned to the page start. + // Otherwise we cannot drop the page. + + bool((_start + start) % page_size) * bool(start != 0); + + auto hi_page = (_start + end) / page_size; + + if (lo_page < hi_page) { + evict_range(_cache.lower_bound(lo_page), _cache.lower_bound(hi_page)); + } + } + + /// \brief Equivalent to \ref invalidate_at_most(0, end). + void invalidate_at_most_front(offset_type end) { + evict_range(_cache.begin(), _cache.lower_bound((_start + end) / page_size)); + } + + /// \brief Read from the file + /// + /// Returns a stream with data which starts at position pos in the area managed by this instance. + /// This cached_file instance must outlive the returned stream. + /// The stream does not do any read-ahead. + /// + /// \param pos The offset of the first byte to read, relative to the cached file area. + stream read(offset_type pos, const io_priority_class& pc) { + if (pos >= _size) { + return stream(); + } + auto global_pos = _start + pos; + auto offset = global_pos % page_size; + auto page_idx = global_pos / page_size; + return stream(*this, pc, page_idx, offset); + } + + /// \brief Returns the number of bytes in the area managed by this instance. + offset_type size() const { + return _size; + } + + /// \brief Returns the number of bytes cached. + size_t cached_bytes() const { + return _cache.size() * page_size; + } +};