sstables: Cache all index file reads

After this patch, there is a singe index file page cache per
sstable, shared by index readers. The cache survives reads,
which reduces amount of I/O on subsequent reads.

As part of this, cached_file needed to be adjusted in the following ways.

The page cache may occupy a significant portion of memory. Keeping the
pages in the standard allocator could cause memory fragmentation
problems. To avoid them, the cache_file is changed to keep buffers in LSA
using lsa_buffer allocation method.

When a page is needed by the seastar I/O layer, it needs to be copied
to a temporary_buffer which is stable, so must be allocated in the
standard allocator space. We copy the page on-demand. Concurrent
requests for the same page will share the temporary_buffer. When page
is not used, it only lives in the LSA space.

In the subsequent patches cached_file::stream will be adjusted to also support
access via cached_page::ptr_type directly, to avoid materializating a
temporary_buffer.

While a page is used, it is not linked in the LRU so that it is not
freed. This ensures that the storage which is actively consumed
remains stable, either via temporary_buffer (kept alive by its
deleter), or by cached_page::ptr_type directly.
This commit is contained in:
Tomasz Grabiec
2021-02-22 18:55:36 +01:00
parent b5ca0eb2a2
commit 078a6e422b
6 changed files with 259 additions and 48 deletions

View File

@@ -292,17 +292,10 @@ std::unique_ptr<clustered_index_cursor> promoted_index::make_cursor(shared_sstab
}
if (_use_binary_search) {
cached_file f(_index_file,
index_page_cache_metrics,
_promoted_index_start,
_promoted_index_size,
trace_state ? sst->filename(component_type::Index) : sstring());
f.populate_front(_front.share());
return std::make_unique<mc::bsearch_clustered_cursor>(*sst->get_schema(),
_promoted_index_start, _promoted_index_size,
promoted_index_cache_metrics, permit,
*ck_values_fixed_lengths, std::move(f), options.io_priority_class, _num_blocks, trace_state);
*ck_values_fixed_lengths, *sst->_cached_index_file, options.io_priority_class, _num_blocks, trace_state);
}
input_stream<char> promoted_index_stream = [&] {

View File

@@ -169,10 +169,12 @@ private:
block_set_type _blocks;
public:
const schema& _s;
uint64_t _promoted_index_start;
uint64_t _promoted_index_size;
metrics& _metrics;
const pi_index_type _blocks_count;
const io_priority_class _pc;
cached_file _cached_file;
cached_file& _cached_file;
data_consumer::primitive_consumer _primitive_parser;
clustering_parser _clustering_parser;
promoted_index_block_parser _block_parser;
@@ -197,11 +199,11 @@ private:
// The offset is relative to the promoted index start in the index file.
// idx must be in the range 0..(_blocks_count-1)
pi_offset_type get_offset_entry_pos(pi_index_type idx) const {
return _cached_file.size() - (_blocks_count - idx) * sizeof(pi_offset_type);
return _promoted_index_size - (_blocks_count - idx) * sizeof(pi_offset_type);
}
future<pi_offset_type> read_block_offset(pi_index_type idx, tracing::trace_state_ptr trace_state) {
_stream = _cached_file.read(get_offset_entry_pos(idx), _pc, _permit, trace_state);
_stream = _cached_file.read(_promoted_index_start + get_offset_entry_pos(idx), _pc, _permit, trace_state);
return _stream.next().then([this, idx] (temporary_buffer<char>&& buf) {
if (__builtin_expect(_primitive_parser.read_32(buf) == data_consumer::read_status::ready, true)) {
return make_ready_future<pi_offset_type>(_primitive_parser._u32);
@@ -215,7 +217,7 @@ private:
// Postconditions:
// - block.start is engaged and valid.
future<> read_block_start(promoted_index_block& block, tracing::trace_state_ptr trace_state) {
_stream = _cached_file.read(block.offset, _pc, _permit, trace_state);
_stream = _cached_file.read(_promoted_index_start + block.offset, _pc, _permit, trace_state);
_clustering_parser.reset();
return consume_stream(_stream, _clustering_parser).then([this, &block] {
auto mem_before = block.memory_usage();
@@ -227,7 +229,7 @@ private:
// Postconditions:
// - block.end is engaged, all fields in the block are valid
future<> read_block(promoted_index_block& block, tracing::trace_state_ptr trace_state) {
_stream = _cached_file.read(block.offset, _pc, _permit, trace_state);
_stream = _cached_file.read(_promoted_index_start + block.offset, _pc, _permit, trace_state);
_block_parser.reset();
return consume_stream(_stream, _block_parser).then([this, &block] {
auto mem_before = block.memory_usage();
@@ -267,18 +269,22 @@ private:
}
public:
cached_promoted_index(const schema& s,
uint64_t promoted_index_start,
uint64_t promoted_index_size,
metrics& m,
reader_permit permit,
column_values_fixed_lengths cvfl,
cached_file f,
cached_file& f,
io_priority_class pc,
pi_index_type blocks_count)
: _blocks(block_comparator{s})
, _s(s)
, _promoted_index_start(promoted_index_start)
, _promoted_index_size(promoted_index_size)
, _metrics(m)
, _blocks_count(blocks_count)
, _pc(pc)
, _cached_file(std::move(f))
, _cached_file(f)
, _primitive_parser(permit)
, _clustering_parser(s, permit, cvfl, true)
, _block_parser(s, permit, std::move(cvfl))
@@ -343,8 +349,6 @@ public:
// Invalidates information about blocks with smaller indexes than a given block.
void invalidate_prior(promoted_index_block* block, tracing::trace_state_ptr trace_state) {
_cached_file.invalidate_at_most_front(block->offset, trace_state);
_cached_file.invalidate_at_most(get_offset_entry_pos(0), get_offset_entry_pos(block->index), trace_state);
erase_range(_blocks.begin(), _blocks.lower_bound(block->index));
}
@@ -438,16 +442,26 @@ private:
}
public:
bsearch_clustered_cursor(const schema& s,
uint64_t promoted_index_start,
uint64_t promoted_index_size,
cached_promoted_index::metrics& metrics,
reader_permit permit,
column_values_fixed_lengths cvfl,
cached_file f,
cached_file& f,
io_priority_class pc,
pi_index_type blocks_count,
tracing::trace_state_ptr trace_state)
: _s(s)
, _blocks_count(blocks_count)
, _promoted_index(s, metrics, std::move(permit), std::move(cvfl), std::move(f), pc, blocks_count)
, _promoted_index(s,
promoted_index_start,
promoted_index_size,
metrics,
std::move(permit),
std::move(cvfl),
f,
pc,
blocks_count)
, _trace_state(std::move(trace_state))
{ }

View File

@@ -85,6 +85,7 @@
#include "kl/reader.hh"
#include "mx/reader.hh"
#include "utils/bit_cast.hh"
#include "utils/cached_file.hh"
thread_local disk_error_signal_type sstable_read_error;
thread_local disk_error_signal_type sstable_write_error;
@@ -1343,6 +1344,13 @@ future<> sstable::update_info_for_opened_data() {
}).then([this] {
return _index_file.size().then([this] (auto size) {
_index_file_size = size;
assert(!_cached_index_file);
_cached_index_file = seastar::make_shared<cached_file>(_index_file,
index_page_cache_metrics, 0,
_manager.get_cache_tracker().get_lru(),
_manager.get_cache_tracker().region(),
_index_file_size);
_index_file = make_cached_seastar_file(*_cached_index_file);
});
}).then([this] {
if (this->has_component(component_type::Filter)) {
@@ -2849,6 +2857,8 @@ future<> init_metrics() {
sm::description("Total number of index page cache pages which were inserted into the cache")),
sm::make_gauge("index_page_cache_bytes", [] { return index_page_cache_metrics.cached_bytes; },
sm::description("Total number of bytes cached in the index page cache")),
sm::make_gauge("index_page_cache_bytes_in_std", [] { return index_page_cache_metrics.bytes_in_std; },
sm::description("Total number of bytes in temporary buffers which live in the std allocator")),
sm::make_derive("pi_cache_hits_l0", [] { return promoted_index_cache_metrics.hits_l0; },
sm::description("Number of requests for promoted index block in state l0 which didn't have to go to the page cache")),

View File

@@ -68,6 +68,7 @@
class sstable_assertions;
class flat_mutation_reader;
class cached_file;
namespace sstables {
@@ -457,6 +458,7 @@ private:
// it is then used to generate the ancestors metadata in the statistics or scylla components.
std::set<int> _compaction_ancestors;
file _index_file;
seastar::shared_ptr<cached_file> _cached_index_file;
file _data_file;
uint64_t _data_file_size;
uint64_t _index_file_size;
@@ -815,6 +817,7 @@ public:
friend class sstable_writer;
friend class mc::writer;
friend class index_reader;
friend class promoted_index;
friend class compaction;
friend class sstables_manager;
template <typename DataConsumeRowsContext>

View File

@@ -22,6 +22,7 @@
#include <seastar/testing/test_case.hh>
#include <seastar/testing/thread_test_case.hh>
#include <seastar/core/iostream.hh>
#include <seastar/core/when_all.hh>
#include <seastar/core/fstream.hh>
#include <seastar/core/file.hh>
#include <seastar/core/reactor.hh>
@@ -36,6 +37,8 @@
using namespace seastar;
static lru cf_lru;
static sstring read_to_string(cached_file::stream& s, size_t limit = std::numeric_limits<size_t>::max()) {
sstring b;
while (auto buf = s.next().get0()) {
@@ -96,7 +99,8 @@ SEASTAR_THREAD_TEST_CASE(test_file_wrapper) {
auto page_size = cached_file::page_size;
cached_file::metrics metrics;
test_file tf = make_test_file(page_size * 3);
cached_file cf(tf.f, metrics, 0, page_size * 3);
logalloc::region region;
cached_file cf(tf.f, metrics, 0, cf_lru, region, page_size * 3);
seastar::file f = make_cached_seastar_file(cf);
BOOST_REQUIRE_EQUAL(tf.contents.substr(0, 1),
@@ -114,17 +118,42 @@ SEASTAR_THREAD_TEST_CASE(test_file_wrapper) {
BOOST_CHECK_THROW(read_to_string(f, 0, cf.size() + 1), seastar::file::eof_error);
}
SEASTAR_THREAD_TEST_CASE(test_concurrent_population) {
auto page_size = cached_file::page_size;
cached_file::metrics metrics;
test_file tf = make_test_file(page_size * 3);
logalloc::region region;
cached_file cf(tf.f, metrics, 0, cf_lru, region, page_size * 3);
seastar::file f = make_cached_seastar_file(cf);
seastar::when_all(
seastar::async([&] {
BOOST_REQUIRE_EQUAL(tf.contents.substr(0, 1), read_to_string(f, 0, 1));
}),
seastar::async([&] {
BOOST_REQUIRE_EQUAL(tf.contents.substr(0, 1), read_to_string(f, 0, 1));
})
).get();
BOOST_REQUIRE_EQUAL(page_size, metrics.cached_bytes);
BOOST_REQUIRE_EQUAL(2, metrics.page_misses);
BOOST_REQUIRE_EQUAL(0, metrics.page_evictions);
BOOST_REQUIRE_EQUAL(0, metrics.page_hits);
BOOST_REQUIRE_EQUAL(1, metrics.page_populations);
}
SEASTAR_THREAD_TEST_CASE(test_reading_from_small_file) {
test_file tf = make_test_file(1024);
{
cached_file::metrics metrics;
cached_file cf(tf.f, metrics, 0, tf.contents.size());
logalloc::region region;
cached_file cf(tf.f, metrics, 0, cf_lru, region, tf.contents.size());
{
BOOST_REQUIRE_EQUAL(tf.contents, read_to_string(cf, 0));
BOOST_REQUIRE_EQUAL(1024, metrics.cached_bytes);
BOOST_REQUIRE_EQUAL(cached_file::page_size, metrics.cached_bytes);
BOOST_REQUIRE_EQUAL(1, metrics.page_misses);
BOOST_REQUIRE_EQUAL(0, metrics.page_evictions);
BOOST_REQUIRE_EQUAL(0, metrics.page_hits);
@@ -134,7 +163,7 @@ SEASTAR_THREAD_TEST_CASE(test_reading_from_small_file) {
{
BOOST_REQUIRE_EQUAL(tf.contents.substr(2), read_to_string(cf, 2));
BOOST_REQUIRE_EQUAL(1024, metrics.cached_bytes);
BOOST_REQUIRE_EQUAL(cached_file::page_size, metrics.cached_bytes);
BOOST_REQUIRE_EQUAL(1, metrics.page_misses);
BOOST_REQUIRE_EQUAL(0, metrics.page_evictions);
BOOST_REQUIRE_EQUAL(1, metrics.page_hits); // change here
@@ -145,7 +174,7 @@ SEASTAR_THREAD_TEST_CASE(test_reading_from_small_file) {
BOOST_REQUIRE_EQUAL(sstring(), read_to_string(cf, 3000));
// no change
BOOST_REQUIRE_EQUAL(1024, metrics.cached_bytes);
BOOST_REQUIRE_EQUAL(cached_file::page_size, metrics.cached_bytes);
BOOST_REQUIRE_EQUAL(1, metrics.page_misses);
BOOST_REQUIRE_EQUAL(0, metrics.page_evictions);
BOOST_REQUIRE_EQUAL(1, metrics.page_hits);
@@ -156,7 +185,8 @@ SEASTAR_THREAD_TEST_CASE(test_reading_from_small_file) {
{
size_t off = 100;
cached_file::metrics metrics;
cached_file cf(tf.f, metrics, off, tf.contents.size() - off);
logalloc::region region;
cached_file cf(tf.f, metrics, off, cf_lru, region, tf.contents.size() - off);
BOOST_REQUIRE_EQUAL(tf.contents.substr(off), read_to_string(cf, 0));
BOOST_REQUIRE_EQUAL(tf.contents.substr(off + 2), read_to_string(cf, 2));
@@ -164,12 +194,86 @@ SEASTAR_THREAD_TEST_CASE(test_reading_from_small_file) {
}
}
SEASTAR_THREAD_TEST_CASE(test_eviction_via_lru) {
auto page = cached_file::page_size;
auto file_size = page * 2 + 12;
test_file tf = make_test_file(file_size);
{
cached_file::metrics metrics;
logalloc::region region;
cached_file cf(tf.f, metrics, 0, cf_lru, region, tf.contents.size());
{
BOOST_REQUIRE_EQUAL(tf.contents, read_to_string(cf, 0));
BOOST_REQUIRE_EQUAL(page * 3, metrics.cached_bytes);
BOOST_REQUIRE_EQUAL(page * 3, cf.cached_bytes());
BOOST_REQUIRE_EQUAL(3, metrics.page_misses);
BOOST_REQUIRE_EQUAL(0, metrics.page_evictions);
BOOST_REQUIRE_EQUAL(0, metrics.page_hits);
BOOST_REQUIRE_EQUAL(3, metrics.page_populations);
}
{
cf_lru.evict_all();
BOOST_REQUIRE_EQUAL(0, metrics.cached_bytes); // change here
BOOST_REQUIRE_EQUAL(0, cf.cached_bytes()); // change here
BOOST_REQUIRE_EQUAL(3, metrics.page_misses);
BOOST_REQUIRE_EQUAL(3, metrics.page_evictions); // change here
BOOST_REQUIRE_EQUAL(0, metrics.page_hits);
BOOST_REQUIRE_EQUAL(3, metrics.page_populations);
}
{
BOOST_REQUIRE_EQUAL(tf.contents, read_to_string(cf, 0));
BOOST_REQUIRE_EQUAL(page * 3, metrics.cached_bytes); // change here
BOOST_REQUIRE_EQUAL(page * 3, cf.cached_bytes());
BOOST_REQUIRE_EQUAL(6, metrics.page_misses); // change here
BOOST_REQUIRE_EQUAL(3, metrics.page_evictions);
BOOST_REQUIRE_EQUAL(0, metrics.page_hits);
BOOST_REQUIRE_EQUAL(6, metrics.page_populations); // change here
}
{
// Test that the page which is touched is evicted last
BOOST_REQUIRE_EQUAL(tf.contents.substr(page, 1), read_to_string(cf, page, 1)); // hit page 1
BOOST_REQUIRE_EQUAL(6, metrics.page_misses);
BOOST_REQUIRE_EQUAL(3, metrics.page_evictions);
BOOST_REQUIRE_EQUAL(1, metrics.page_hits);
BOOST_REQUIRE_EQUAL(6, metrics.page_populations);
cf_lru.evict();
BOOST_REQUIRE_EQUAL(tf.contents.substr(page, 1), read_to_string(cf, page, 1)); // hit page 1
BOOST_REQUIRE_EQUAL(6, metrics.page_misses);
BOOST_REQUIRE_EQUAL(4, metrics.page_evictions); // change
BOOST_REQUIRE_EQUAL(2, metrics.page_hits); // change
BOOST_REQUIRE_EQUAL(6, metrics.page_populations);
cf_lru.evict();
BOOST_REQUIRE_EQUAL(tf.contents.substr(page, 1), read_to_string(cf, page, 1)); // hit page 1
BOOST_REQUIRE_EQUAL(6, metrics.page_misses);
BOOST_REQUIRE_EQUAL(5, metrics.page_evictions); // change
BOOST_REQUIRE_EQUAL(3, metrics.page_hits); // change
BOOST_REQUIRE_EQUAL(6, metrics.page_populations);
}
}
}
SEASTAR_THREAD_TEST_CASE(test_invalidation) {
auto page_size = cached_file::page_size;
test_file tf = make_test_file(page_size * 2);
cached_file::metrics metrics;
cached_file cf(tf.f, metrics, 0, page_size * 2);
logalloc::region region;
cached_file cf(tf.f, metrics, 0, cf_lru, region, page_size * 2);
// Reads one page, half of the first page and half of the second page.
auto read = [&] {
@@ -260,7 +364,8 @@ SEASTAR_THREAD_TEST_CASE(test_invalidation_skewed_cached_file) {
size_t offset = page_size / 2;
cached_file::metrics metrics;
cached_file cf(tf.f, metrics, offset, page_size * 2);
logalloc::region region;
cached_file cf(tf.f, metrics, offset, cf_lru, region, page_size * 2);
// Reads one page, half of the first page and half of the second page.
auto read = [&] {

View File

@@ -24,6 +24,7 @@
#include "reader_permit.hh"
#include "utils/div_ceil.hh"
#include "utils/bptree.hh"
#include "utils/lru.hh"
#include "tracing/trace_state.hh"
#include <seastar/core/file.hh>
@@ -35,7 +36,7 @@ using namespace seastar;
/// \brief A read-through cache of a file.
///
/// Caches contents with page granularity (4 KiB).
/// Cached pages are evicted manually using the invalidate_*() method family, or when the object is destroyed.
/// Cached pages are evicted by the LRU or manually using the invalidate_*() method family, or when the object is destroyed.
///
/// Concurrent reading is allowed.
///
@@ -68,19 +69,78 @@ public:
uint64_t page_evictions = 0;
uint64_t page_populations = 0;
uint64_t cached_bytes = 0;
uint64_t bytes_in_std = 0; // memory used by active temporary_buffer:s
};
private:
class cached_page {
class cached_page : public evictable {
public:
cached_file* parent;
page_idx_type idx;
temporary_buffer<char> buf;
logalloc::lsa_buffer _lsa_buf;
temporary_buffer<char> _buf; // Empty when not shared. May mirror _lsa_buf when shared.
size_t _use_count = 0;
public:
struct cached_page_del {
void operator()(cached_page* cp) {
if (--cp->_use_count == 0) {
cp->parent->_metrics.bytes_in_std -= cp->_buf.size();
cp->_buf = {};
cp->parent->_lru.add(*cp);
}
}
};
using ptr_type = std::unique_ptr<cached_page, cached_page_del>;
// As long as any ptr_type is alive, this cached_page will not be destroyed
// because it will not be linked in the LRU.
ptr_type share() noexcept {
if (_use_count++ == 0) {
unlink_from_lru();
}
return std::unique_ptr<cached_page, cached_page_del>(this);
}
public:
explicit cached_page(cached_file* parent, page_idx_type idx, temporary_buffer<char> buf)
: parent(parent)
, idx(idx)
, buf(std::move(buf))
{ }
cached_page(cached_page&&) noexcept = default;
, _buf(std::move(buf))
{
_lsa_buf = parent->_region.alloc_buf(_buf.size());
parent->_metrics.bytes_in_std += _buf.size();
std::copy(_buf.begin(), _buf.end(), _lsa_buf.get());
}
cached_page(cached_page&&) noexcept {
// The move constructor is required by allocation_strategy::construct() due to generic bplus::tree,
// but the object is always allocated in the standard allocator context so never actually moved.
// We cannot properly implement the move constructor because "this" is captured in various places.
abort();
}
~cached_page() {
assert(!_use_count);
}
void on_evicted() noexcept override;
temporary_buffer<char> get_buf() {
auto self = share();
if (!_buf) {
_buf = temporary_buffer<char>(_lsa_buf.size());
parent->_metrics.bytes_in_std += _lsa_buf.size();
std::copy(_lsa_buf.get(), _lsa_buf.get() + _lsa_buf.size(), _buf.get_write());
}
// Holding to a temporary buffer holds the cached page so that the buffer can be reused by concurrent hits.
// Also, sharing cached_page keeps the temporary_buffer's storage alive.
return temporary_buffer<char>(_buf.get_write(), _buf.size(), make_deleter([self = std::move(self)] {}));
}
size_t size_in_allocator() {
// lsa_buf occupies 4K in LSA even if the buf size is smaller.
// _buf is transient and not accounted here.
return page_size;
}
};
struct page_idx_less_comparator {
@@ -92,6 +152,8 @@ private:
file _file;
sstring _file_name; // for logging / tracing
metrics& _metrics;
lru& _lru;
logalloc::region& _region;
using cache_type = bplus::tree<page_idx_type, cached_page, page_idx_less_comparator, 12, bplus::key_search::linear>;
cache_type _cache;
@@ -103,27 +165,37 @@ private:
offset_type _last_page_size; // Ignores _start in case the start lies on the same page.
page_idx_type _last_page;
private:
future<temporary_buffer<char>> get_page(page_idx_type idx, const io_priority_class& pc,
future<cached_page::ptr_type> get_page_ptr(page_idx_type idx, const io_priority_class& pc,
tracing::trace_state_ptr trace_state) {
auto i = _cache.lower_bound(idx);
if (i != _cache.end() && i->idx == idx) {
++_metrics.page_hits;
tracing::trace(trace_state, "page cache hit: file={}, page={}", _file_name, idx);
cached_page& cp = *i;
return make_ready_future<temporary_buffer<char>>(cp.buf.share());
return make_ready_future<cached_page::ptr_type>(cp.share());
}
tracing::trace(trace_state, "page cache miss: file={}, page={}", _file_name, idx);
++_metrics.page_misses;
auto size = idx == _last_page ? _last_page_size : page_size;
return _file.dma_read_exactly<char>(idx * page_size, size, pc)
.then([this, idx] (temporary_buffer<char>&& buf) mutable {
++_metrics.page_populations;
_metrics.cached_bytes += buf.size();
_cached_bytes += buf.size();
_cache.emplace(idx, cached_page(this, idx, buf.share()));
return std::move(buf);
auto it_and_flag = _cache.emplace(idx, this, idx, std::move(buf));
cached_page& cp = *it_and_flag.first;
if (it_and_flag.second) {
++_metrics.page_populations;
_metrics.cached_bytes += cp.size_in_allocator();
_cached_bytes += cp.size_in_allocator();
}
return cp.share();
});
}
future<temporary_buffer<char>> get_page(page_idx_type idx,
const io_priority_class& pc,
tracing::trace_state_ptr trace_state) {
return get_page_ptr(idx, pc, std::move(trace_state)).then([] (cached_page::ptr_type cp) {
return cp->get_buf();
});
}
public:
// Generator of subsequent pages of data reflecting the contents of the file.
// Single-user.
@@ -175,8 +247,8 @@ public:
};
void on_evicted(cached_page& p) {
_metrics.cached_bytes -= p.buf.size();
_cached_bytes -= p.buf.size();
_metrics.cached_bytes -= p.size_in_allocator();
_cached_bytes -= p.size_in_allocator();
++_metrics.page_evictions;
}
@@ -184,9 +256,13 @@ public:
size_t count = 0;
auto disposer = [] (auto* p) noexcept {};
while (start != end) {
++count;
on_evicted(*start);
start = start.erase_and_dispose(disposer, page_idx_less_comparator());
if (start->is_linked()) {
++count;
on_evicted(*start);
start = start.erase_and_dispose(disposer, page_idx_less_comparator());
} else {
++start;
}
}
return count;
}
@@ -198,10 +274,12 @@ public:
/// \param m Metrics object which should be updated from operations on this object.
/// The metrics object can be shared by many cached_file instances, in which case it
/// will reflect the sum of operations on all cached_file instances.
cached_file(file f, cached_file::metrics& m, offset_type start, offset_type size, sstring file_name = {})
cached_file(file f, cached_file::metrics& m, offset_type start, lru& l, logalloc::region& reg, offset_type size, sstring file_name = {})
: _file(std::move(f))
, _file_name(std::move(file_name))
, _metrics(m)
, _lru(l)
, _region(reg)
, _cache(page_idx_less_comparator())
, _start(start)
, _size(size)
@@ -216,6 +294,7 @@ public:
~cached_file() {
evict_range(_cache.begin(), _cache.end());
assert(_cache.empty());
}
/// \brief Populates cache from buf assuming that buf contains the data from the front of the area.
@@ -279,7 +358,7 @@ public:
/// \brief Read from the file
///
/// Returns a stream with data which starts at position pos in the area managed by this instance.
/// This cached_file instance must outlive the returned stream.
/// This cached_file instance must outlive the returned stream and buffers returned by the stream.
/// The stream does not do any read-ahead.
///
/// \param pos The offset of the first byte to read, relative to the cached file area.
@@ -311,6 +390,13 @@ public:
}
};
inline
void cached_file::cached_page::on_evicted() noexcept {
parent->on_evicted(*this);
cached_file::cache_type::iterator it(this);
it.erase(page_idx_less_comparator());
}
class cached_file_impl : public file_impl {
cached_file& _cf;
tracing::trace_state_ptr _trace_state;