utils: Introduce cached_file

It is a read-through cache of a file.

Will be used to cache contents of the promoted index area from the
index file.

Currently, cached pages are evicted manually using the invalidate_*()
method family, or when the object is destroyed.

The cached_file represents a subset of the file. The reason for this
is to satisfy two requirements. One is that we have a page-aligned
caching, where pages are aligned relative to the start of the
underlying file. This matches requirements of the seastar I/O engine
on I/O requests.  Another requirement is to have an effective way to
populate the cache using an unaligned buffer which starts in the
middle of the file when we know that we won't need to access bytes
located before the buffer's position. See populate_front(). If we
couldn't assume that, we wouldn't be able to insert an unaligned
buffer into the cache.
This commit is contained in:
Tomasz Grabiec
2019-06-28 16:23:36 +02:00
parent ab274b8203
commit c95dd67d11
3 changed files with 588 additions and 0 deletions

View File

@@ -278,6 +278,7 @@ scylla_tests = set([
'test/boost/broken_sstable_test',
'test/boost/bytes_ostream_test',
'test/boost/cache_flat_mutation_reader_test',
'test/boost/cached_file_test',
'test/boost/caching_options_test',
'test/boost/canonical_mutation_test',
'test/boost/cartesian_product_test',

View File

@@ -0,0 +1,326 @@
/*
* Copyright (C) 2020 ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#include <seastar/testing/test_case.hh>
#include <seastar/testing/thread_test_case.hh>
#include <seastar/core/iostream.hh>
#include <seastar/core/fstream.hh>
#include <seastar/core/file.hh>
#include <seastar/core/reactor.hh>
#include <seastar/util/defer.hh>
#include "test/lib/random_utils.hh"
#include "test/lib/log.hh"
#include "test/lib/tmpdir.hh"
#include "test/lib/reader_permit.hh"
#include "utils/cached_file.hh"
using namespace seastar;
static sstring read_to_string(cached_file::stream& s, size_t limit = std::numeric_limits<size_t>::max()) {
sstring b;
while (auto buf = s.next().get0()) {
b += sstring(buf.get(), buf.size());
if (b.size() >= limit) {
break;
}
}
return b.substr(0, limit);
}
static sstring read_to_string(cached_file& cf, size_t off, size_t limit = std::numeric_limits<size_t>::max()) {
auto s = cf.read(off, default_priority_class());
return read_to_string(s, limit);
}
struct test_file {
tmpdir dir;
file f;
sstring contents;
~test_file() {
f.close().get();
}
};
test_file make_test_file(size_t size) {
tmpdir dir;
auto contents = tests::random::get_sstring(size);
auto path = dir.path() / "file";
file f = open_file_dma(path.c_str(), open_flags::create | open_flags::rw).get0();
testlog.debug("file contents: {}", contents);
output_stream<char> out = make_file_output_stream(f).get0();
auto close_out = defer([&] { out.close().get(); });
out.write(contents.begin(), contents.size()).get();
out.flush().get();
f = open_file_dma(path.c_str(), open_flags::ro).get0();
return test_file{
.dir = std::move(dir),
.f = std::move(f),
.contents = std::move(contents)
};
}
SEASTAR_THREAD_TEST_CASE(test_reading_from_small_file) {
test_file tf = make_test_file(1024);
{
cached_file::metrics metrics;
cached_file cf(tf.f, tests::make_permit(), metrics, 0, tf.contents.size());
{
BOOST_REQUIRE_EQUAL(tf.contents, read_to_string(cf, 0));
BOOST_REQUIRE_EQUAL(1024, metrics.cached_bytes);
BOOST_REQUIRE_EQUAL(1, metrics.page_misses);
BOOST_REQUIRE_EQUAL(0, metrics.page_evictions);
BOOST_REQUIRE_EQUAL(0, metrics.page_hits);
BOOST_REQUIRE_EQUAL(1, metrics.page_populations);
}
{
BOOST_REQUIRE_EQUAL(tf.contents.substr(2), read_to_string(cf, 2));
BOOST_REQUIRE_EQUAL(1024, metrics.cached_bytes);
BOOST_REQUIRE_EQUAL(1, metrics.page_misses);
BOOST_REQUIRE_EQUAL(0, metrics.page_evictions);
BOOST_REQUIRE_EQUAL(1, metrics.page_hits); // change here
BOOST_REQUIRE_EQUAL(1, metrics.page_populations);
}
{
BOOST_REQUIRE_EQUAL(sstring(), read_to_string(cf, 3000));
// no change
BOOST_REQUIRE_EQUAL(1024, metrics.cached_bytes);
BOOST_REQUIRE_EQUAL(1, metrics.page_misses);
BOOST_REQUIRE_EQUAL(0, metrics.page_evictions);
BOOST_REQUIRE_EQUAL(1, metrics.page_hits);
BOOST_REQUIRE_EQUAL(1, metrics.page_populations);
}
}
{
size_t off = 100;
cached_file::metrics metrics;
cached_file cf(tf.f, tests::make_permit(), metrics, off, tf.contents.size() - off);
BOOST_REQUIRE_EQUAL(tf.contents.substr(off), read_to_string(cf, 0));
BOOST_REQUIRE_EQUAL(tf.contents.substr(off + 2), read_to_string(cf, 2));
BOOST_REQUIRE_EQUAL(sstring(), read_to_string(cf, 3000));
}
}
SEASTAR_THREAD_TEST_CASE(test_invalidation) {
auto page_size = cached_file::page_size;
test_file tf = make_test_file(page_size * 2);
cached_file::metrics metrics;
cached_file cf(tf.f, tests::make_permit(), metrics, 0, page_size * 2);
// Reads one page, half of the first page and half of the second page.
auto read = [&] {
BOOST_REQUIRE_EQUAL(
tf.contents.substr(page_size / 2, page_size),
read_to_string(cf, page_size / 2, page_size));
};
read();
BOOST_REQUIRE_EQUAL(2, metrics.page_populations);
BOOST_REQUIRE_EQUAL(2, metrics.page_misses);
metrics = {};
read();
BOOST_REQUIRE_EQUAL(0, metrics.page_misses);
BOOST_REQUIRE_EQUAL(2, metrics.page_hits);
metrics = {};
cf.invalidate_at_most(0, page_size / 2);
BOOST_REQUIRE_EQUAL(0, metrics.page_evictions);
read();
BOOST_REQUIRE_EQUAL(0, metrics.page_misses);
BOOST_REQUIRE_EQUAL(2, metrics.page_hits);
metrics = {};
cf.invalidate_at_most(0, page_size - 1);
BOOST_REQUIRE_EQUAL(0, metrics.page_evictions);
read();
BOOST_REQUIRE_EQUAL(0, metrics.page_misses);
BOOST_REQUIRE_EQUAL(2, metrics.page_hits);
metrics = {};
cf.invalidate_at_most(0, page_size);
BOOST_REQUIRE_EQUAL(1, metrics.page_evictions);
read();
BOOST_REQUIRE_EQUAL(1, metrics.page_misses);
BOOST_REQUIRE_EQUAL(1, metrics.page_populations);
BOOST_REQUIRE_EQUAL(1, metrics.page_hits);
metrics = {};
cf.invalidate_at_most(page_size, page_size + 1);
BOOST_REQUIRE_EQUAL(0, metrics.page_evictions);
read();
BOOST_REQUIRE_EQUAL(0, metrics.page_misses);
BOOST_REQUIRE_EQUAL(2, metrics.page_hits);
metrics = {};
cf.invalidate_at_most(page_size, page_size + page_size);
BOOST_REQUIRE_EQUAL(1, metrics.page_evictions);
read();
BOOST_REQUIRE_EQUAL(1, metrics.page_misses);
BOOST_REQUIRE_EQUAL(1, metrics.page_populations);
BOOST_REQUIRE_EQUAL(1, metrics.page_hits);
metrics = {};
cf.invalidate_at_most(0, page_size * 3);
BOOST_REQUIRE_EQUAL(2, metrics.page_evictions);
read();
BOOST_REQUIRE_EQUAL(2, metrics.page_misses);
BOOST_REQUIRE_EQUAL(2, metrics.page_populations);
BOOST_REQUIRE_EQUAL(0, metrics.page_hits);
metrics = {};
cf.invalidate_at_most_front(0);
BOOST_REQUIRE_EQUAL(0, metrics.page_evictions);
metrics = {};
cf.invalidate_at_most_front(1);
BOOST_REQUIRE_EQUAL(0, metrics.page_evictions);
metrics = {};
cf.invalidate_at_most_front(page_size);
BOOST_REQUIRE_EQUAL(1, metrics.page_evictions);
metrics = {};
cf.invalidate_at_most_front(page_size * 2);
BOOST_REQUIRE_EQUAL(1, metrics.page_evictions);
read();
BOOST_REQUIRE_EQUAL(2, metrics.page_misses);
BOOST_REQUIRE_EQUAL(2, metrics.page_populations);
BOOST_REQUIRE_EQUAL(0, metrics.page_hits);
}
SEASTAR_THREAD_TEST_CASE(test_invalidation_skewed_cached_file) {
auto page_size = cached_file::page_size;
test_file tf = make_test_file(page_size * 3);
size_t offset = page_size / 2;
cached_file::metrics metrics;
cached_file cf(tf.f, tests::make_permit(), metrics, offset, page_size * 2);
// Reads one page, half of the first page and half of the second page.
auto read = [&] {
BOOST_REQUIRE_EQUAL(
tf.contents.substr(offset, page_size),
read_to_string(cf, 0, page_size));
};
read();
BOOST_REQUIRE_EQUAL(2, metrics.page_populations);
BOOST_REQUIRE_EQUAL(2, metrics.page_misses);
metrics = {};
read();
BOOST_REQUIRE_EQUAL(0, metrics.page_misses);
BOOST_REQUIRE_EQUAL(2, metrics.page_hits);
metrics = {};
cf.invalidate_at_most(0, 1);
BOOST_REQUIRE_EQUAL(0, metrics.page_evictions);
read();
BOOST_REQUIRE_EQUAL(0, metrics.page_misses);
BOOST_REQUIRE_EQUAL(2, metrics.page_hits);
metrics = {};
cf.invalidate_at_most(1, page_size - align_up(offset, page_size) - 1);
BOOST_REQUIRE_EQUAL(0, metrics.page_evictions);
read();
BOOST_REQUIRE_EQUAL(0, metrics.page_misses);
BOOST_REQUIRE_EQUAL(2, metrics.page_hits);
metrics = {};
cf.invalidate_at_most(0, page_size);
BOOST_REQUIRE_EQUAL(1, metrics.page_evictions);
read();
BOOST_REQUIRE_EQUAL(1, metrics.page_misses);
BOOST_REQUIRE_EQUAL(1, metrics.page_populations);
BOOST_REQUIRE_EQUAL(1, metrics.page_hits);
metrics = {};
cf.invalidate_at_most(1, page_size);
BOOST_REQUIRE_EQUAL(0, metrics.page_evictions);
read();
BOOST_REQUIRE_EQUAL(0, metrics.page_misses);
BOOST_REQUIRE_EQUAL(2, metrics.page_hits);
metrics = {};
cf.invalidate_at_most(1, align_up(offset, page_size) - offset + page_size);
BOOST_REQUIRE_EQUAL(1, metrics.page_evictions);
read();
BOOST_REQUIRE_EQUAL(1, metrics.page_misses);
BOOST_REQUIRE_EQUAL(1, metrics.page_populations);
BOOST_REQUIRE_EQUAL(1, metrics.page_hits);
metrics = {};
cf.invalidate_at_most(1, align_up(offset, page_size) - offset + page_size - 1);
BOOST_REQUIRE_EQUAL(0, metrics.page_evictions);
read();
BOOST_REQUIRE_EQUAL(0, metrics.page_misses);
BOOST_REQUIRE_EQUAL(0, metrics.page_populations);
BOOST_REQUIRE_EQUAL(2, metrics.page_hits);
metrics = {};
cf.invalidate_at_most(0, page_size * 3);
BOOST_REQUIRE_EQUAL(2, metrics.page_evictions);
read();
BOOST_REQUIRE_EQUAL(2, metrics.page_misses);
BOOST_REQUIRE_EQUAL(2, metrics.page_populations);
BOOST_REQUIRE_EQUAL(0, metrics.page_hits);
metrics = {};
cf.invalidate_at_most_front(0);
BOOST_REQUIRE_EQUAL(0, metrics.page_evictions);
metrics = {};
cf.invalidate_at_most_front(1);
BOOST_REQUIRE_EQUAL(0, metrics.page_evictions);
metrics = {};
cf.invalidate_at_most_front(align_up(offset, page_size) - offset + page_size);
BOOST_REQUIRE_EQUAL(2, metrics.page_evictions);
read();
metrics = {};
cf.invalidate_at_most_front(page_size * 3);
BOOST_REQUIRE_EQUAL(2, metrics.page_evictions);
read();
BOOST_REQUIRE_EQUAL(2, metrics.page_misses);
BOOST_REQUIRE_EQUAL(2, metrics.page_populations);
BOOST_REQUIRE_EQUAL(0, metrics.page_hits);
}

261
utils/cached_file.hh Normal file
View File

@@ -0,0 +1,261 @@
/*
* Copyright (C) 2019 ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include "reader_permit.hh"
#include "utils/div_ceil.hh"
#include <seastar/core/file.hh>
#include <map>
using namespace seastar;
/// \brief A read-through cache of a file.
///
/// Caches contents with page granularity (4 KiB).
/// Cached pages are evicted manually using the invalidate_*() method family, or when the object is destroyed.
///
/// Concurrent reading is allowed.
///
/// The object is movable but this is only allowed before readers are created.
///
/// The cached_file can represent a subset of the file. The reason for this is so to satisfy
/// two requirements. One is that we have a page-aligned caching, where pages are aligned
/// relative to the start of the underlying file. This matches requirements of the seastar I/O engine
/// on I/O requests.
/// Another requirement is to have an effective way to populate the cache using an unaligned buffer
/// which starts in the middle of the file when we know that we won't need to access bytes located
/// before the buffer's position. See populate_front(). If we couldn't assume that, we wouldn't be
/// able to insert an unaligned buffer into the cache.
///
class cached_file {
public:
// Must be aligned to _file.disk_read_dma_alignment(). 4K is always safe.
static constexpr size_t page_size = 4096;
// The content of the underlying file (_file) is divided into pages
// of equal size (page_size). This type is used to identify pages.
// Pages are assigned consecutive identifiers starting from 0.
using page_idx_type = uint64_t;
using offset_type = uint64_t;
struct metrics {
uint64_t page_hits = 0;
uint64_t page_misses = 0;
uint64_t page_evictions = 0;
uint64_t page_populations = 0;
uint64_t cached_bytes = 0;
};
private:
struct cached_page {
temporary_buffer<char> buf;
explicit cached_page(temporary_buffer<char> buf) : buf(std::move(buf)) {}
};
file _file;
reader_permit _permit;
metrics& _metrics;
using cache_type = std::map<page_idx_type, cached_page>;
cache_type _cache;
const offset_type _start;
const offset_type _size;
offset_type _last_page_size; // Ignores _start in case the start lies on the same page.
page_idx_type _last_page;
private:
future<temporary_buffer<char>> get_page(page_idx_type idx, const io_priority_class& pc) {
auto i = _cache.lower_bound(idx);
if (i != _cache.end() && i->first == idx) {
++_metrics.page_hits;
cached_page& cp = i->second;
return make_ready_future<temporary_buffer<char>>(cp.buf.share());
}
++_metrics.page_misses;
auto size = idx == _last_page ? _last_page_size : page_size;
return _file.dma_read_exactly<char>(idx * page_size, size, pc)
.then([this, idx] (temporary_buffer<char>&& buf) mutable {
++_metrics.page_populations;
_metrics.cached_bytes += buf.size();
_cache.emplace(idx, cached_page(buf.share()));
return std::move(buf);
});
}
public:
// Generator of subsequent pages of data reflecting the contents of the file.
// Single-user.
class stream {
cached_file* _cached_file;
const io_priority_class* _pc;
page_idx_type _page_idx;
offset_type _offset_in_page;
public:
// Creates an empty stream.
stream()
: _cached_file(nullptr)
, _pc(nullptr)
{ }
stream(cached_file& cf, const io_priority_class& pc, page_idx_type start_page, offset_type start_offset_in_page)
: _cached_file(&cf)
, _pc(&pc)
, _page_idx(start_page)
, _offset_in_page(start_offset_in_page)
{ }
// Yields the next chunk of data.
// Returns empty buffer when end-of-stream is reached.
// Calls must be serialized.
// This instance must be kept alive until the returned future resolves.
future<temporary_buffer<char>> next() {
if (!_cached_file || _page_idx > _cached_file->_last_page) {
return make_ready_future<temporary_buffer<char>>(temporary_buffer<char>());
}
return _cached_file->get_page(_page_idx, *_pc).then([this] (temporary_buffer<char> page) {
if (_page_idx == _cached_file->_last_page) {
page.trim(_cached_file->_last_page_size);
}
page.trim_front(_offset_in_page);
_offset_in_page = 0;
++_page_idx;
return page;
});
}
};
size_t evict_range(cache_type::iterator start, cache_type::iterator end) noexcept {
size_t count = 0;
while (start != end) {
++count;
_metrics.cached_bytes -= start->second.buf.size();
start = _cache.erase(start);
}
_metrics.page_evictions += count;
return count;
}
public:
/// \brief Constructs a cached_file.
///
/// The cached area will reflect subset of f from the byte range [start, start + size).
///
/// \param m Metrics object which should be updated from operations on this object.
/// The metrics object can be shared by many cached_file instances, in which case it
/// will reflect the sum of operations on all cached_file instances.
cached_file(file f, reader_permit permit, cached_file::metrics& m, offset_type start, offset_type size)
: _file(std::move(f))
, _permit(std::move(permit))
, _metrics(m)
, _start(start)
, _size(size)
{
offset_type last_byte_offset = _start + (_size ? (_size - 1) : 0);
_last_page_size = (last_byte_offset % page_size) + (_size ? 1 : 0);
_last_page = last_byte_offset / page_size;
}
cached_file(cached_file&&) = default;
cached_file(const cached_file&) = delete;
~cached_file() {
evict_range(_cache.begin(), _cache.end());
}
/// \brief Populates cache from buf assuming that buf contains the data from the front of the area.
void populate_front(temporary_buffer<char> buf) {
// Align to page start. We can do this because the junk before _start won't be accessed.
auto pad = _start % page_size;
auto idx = _start / page_size;
buf = temporary_buffer<char>(buf.get_write() - pad, buf.size() + pad, buf.release());
while (buf.size() > page_size) {
auto page_buf = buf.share();
page_buf.trim(page_size);
++_metrics.page_populations;
_metrics.cached_bytes += page_buf.size();
_cache.emplace(idx, cached_page(std::move(page_buf)));
buf.trim_front(page_size);
++idx;
}
if (buf.size() == page_size || (idx == _last_page && buf.size() >= _last_page_size)) {
++_metrics.page_populations;
_metrics.cached_bytes += buf.size();
_cache.emplace(idx, cached_page(std::move(buf)));
}
}
/// \brief Invalidates [start, end) or less.
///
/// Invariants:
///
/// - all bytes outside [start, end) which were cached before the call will still be cached.
///
void invalidate_at_most(offset_type start, offset_type end) {
auto lo_page = (_start + start) / page_size
// If start is 0 then we can drop the containing page
// even if _start is not aligned to the page start.
// Otherwise we cannot drop the page.
+ bool((_start + start) % page_size) * bool(start != 0);
auto hi_page = (_start + end) / page_size;
if (lo_page < hi_page) {
evict_range(_cache.lower_bound(lo_page), _cache.lower_bound(hi_page));
}
}
/// \brief Equivalent to \ref invalidate_at_most(0, end).
void invalidate_at_most_front(offset_type end) {
evict_range(_cache.begin(), _cache.lower_bound((_start + end) / page_size));
}
/// \brief Read from the file
///
/// Returns a stream with data which starts at position pos in the area managed by this instance.
/// This cached_file instance must outlive the returned stream.
/// The stream does not do any read-ahead.
///
/// \param pos The offset of the first byte to read, relative to the cached file area.
stream read(offset_type pos, const io_priority_class& pc) {
if (pos >= _size) {
return stream();
}
auto global_pos = _start + pos;
auto offset = global_pos % page_size;
auto page_idx = global_pos / page_size;
return stream(*this, pc, page_idx, offset);
}
/// \brief Returns the number of bytes in the area managed by this instance.
offset_type size() const {
return _size;
}
/// \brief Returns the number of bytes cached.
size_t cached_bytes() const {
return _cache.size() * page_size;
}
};