sstables: Cache all index file reads
After this patch, there is a singe index file page cache per sstable, shared by index readers. The cache survives reads, which reduces amount of I/O on subsequent reads. As part of this, cached_file needed to be adjusted in the following ways. The page cache may occupy a significant portion of memory. Keeping the pages in the standard allocator could cause memory fragmentation problems. To avoid them, the cache_file is changed to keep buffers in LSA using lsa_buffer allocation method. When a page is needed by the seastar I/O layer, it needs to be copied to a temporary_buffer which is stable, so must be allocated in the standard allocator space. We copy the page on-demand. Concurrent requests for the same page will share the temporary_buffer. When page is not used, it only lives in the LSA space. In the subsequent patches cached_file::stream will be adjusted to also support access via cached_page::ptr_type directly, to avoid materializating a temporary_buffer. While a page is used, it is not linked in the LRU so that it is not freed. This ensures that the storage which is actively consumed remains stable, either via temporary_buffer (kept alive by its deleter), or by cached_page::ptr_type directly.
This commit is contained in:
@@ -292,17 +292,10 @@ std::unique_ptr<clustered_index_cursor> promoted_index::make_cursor(shared_sstab
|
||||
}
|
||||
|
||||
if (_use_binary_search) {
|
||||
cached_file f(_index_file,
|
||||
index_page_cache_metrics,
|
||||
_promoted_index_start,
|
||||
_promoted_index_size,
|
||||
trace_state ? sst->filename(component_type::Index) : sstring());
|
||||
|
||||
f.populate_front(_front.share());
|
||||
|
||||
return std::make_unique<mc::bsearch_clustered_cursor>(*sst->get_schema(),
|
||||
_promoted_index_start, _promoted_index_size,
|
||||
promoted_index_cache_metrics, permit,
|
||||
*ck_values_fixed_lengths, std::move(f), options.io_priority_class, _num_blocks, trace_state);
|
||||
*ck_values_fixed_lengths, *sst->_cached_index_file, options.io_priority_class, _num_blocks, trace_state);
|
||||
}
|
||||
|
||||
input_stream<char> promoted_index_stream = [&] {
|
||||
|
||||
@@ -169,10 +169,12 @@ private:
|
||||
block_set_type _blocks;
|
||||
public:
|
||||
const schema& _s;
|
||||
uint64_t _promoted_index_start;
|
||||
uint64_t _promoted_index_size;
|
||||
metrics& _metrics;
|
||||
const pi_index_type _blocks_count;
|
||||
const io_priority_class _pc;
|
||||
cached_file _cached_file;
|
||||
cached_file& _cached_file;
|
||||
data_consumer::primitive_consumer _primitive_parser;
|
||||
clustering_parser _clustering_parser;
|
||||
promoted_index_block_parser _block_parser;
|
||||
@@ -197,11 +199,11 @@ private:
|
||||
// The offset is relative to the promoted index start in the index file.
|
||||
// idx must be in the range 0..(_blocks_count-1)
|
||||
pi_offset_type get_offset_entry_pos(pi_index_type idx) const {
|
||||
return _cached_file.size() - (_blocks_count - idx) * sizeof(pi_offset_type);
|
||||
return _promoted_index_size - (_blocks_count - idx) * sizeof(pi_offset_type);
|
||||
}
|
||||
|
||||
future<pi_offset_type> read_block_offset(pi_index_type idx, tracing::trace_state_ptr trace_state) {
|
||||
_stream = _cached_file.read(get_offset_entry_pos(idx), _pc, _permit, trace_state);
|
||||
_stream = _cached_file.read(_promoted_index_start + get_offset_entry_pos(idx), _pc, _permit, trace_state);
|
||||
return _stream.next().then([this, idx] (temporary_buffer<char>&& buf) {
|
||||
if (__builtin_expect(_primitive_parser.read_32(buf) == data_consumer::read_status::ready, true)) {
|
||||
return make_ready_future<pi_offset_type>(_primitive_parser._u32);
|
||||
@@ -215,7 +217,7 @@ private:
|
||||
// Postconditions:
|
||||
// - block.start is engaged and valid.
|
||||
future<> read_block_start(promoted_index_block& block, tracing::trace_state_ptr trace_state) {
|
||||
_stream = _cached_file.read(block.offset, _pc, _permit, trace_state);
|
||||
_stream = _cached_file.read(_promoted_index_start + block.offset, _pc, _permit, trace_state);
|
||||
_clustering_parser.reset();
|
||||
return consume_stream(_stream, _clustering_parser).then([this, &block] {
|
||||
auto mem_before = block.memory_usage();
|
||||
@@ -227,7 +229,7 @@ private:
|
||||
// Postconditions:
|
||||
// - block.end is engaged, all fields in the block are valid
|
||||
future<> read_block(promoted_index_block& block, tracing::trace_state_ptr trace_state) {
|
||||
_stream = _cached_file.read(block.offset, _pc, _permit, trace_state);
|
||||
_stream = _cached_file.read(_promoted_index_start + block.offset, _pc, _permit, trace_state);
|
||||
_block_parser.reset();
|
||||
return consume_stream(_stream, _block_parser).then([this, &block] {
|
||||
auto mem_before = block.memory_usage();
|
||||
@@ -267,18 +269,22 @@ private:
|
||||
}
|
||||
public:
|
||||
cached_promoted_index(const schema& s,
|
||||
uint64_t promoted_index_start,
|
||||
uint64_t promoted_index_size,
|
||||
metrics& m,
|
||||
reader_permit permit,
|
||||
column_values_fixed_lengths cvfl,
|
||||
cached_file f,
|
||||
cached_file& f,
|
||||
io_priority_class pc,
|
||||
pi_index_type blocks_count)
|
||||
: _blocks(block_comparator{s})
|
||||
, _s(s)
|
||||
, _promoted_index_start(promoted_index_start)
|
||||
, _promoted_index_size(promoted_index_size)
|
||||
, _metrics(m)
|
||||
, _blocks_count(blocks_count)
|
||||
, _pc(pc)
|
||||
, _cached_file(std::move(f))
|
||||
, _cached_file(f)
|
||||
, _primitive_parser(permit)
|
||||
, _clustering_parser(s, permit, cvfl, true)
|
||||
, _block_parser(s, permit, std::move(cvfl))
|
||||
@@ -343,8 +349,6 @@ public:
|
||||
|
||||
// Invalidates information about blocks with smaller indexes than a given block.
|
||||
void invalidate_prior(promoted_index_block* block, tracing::trace_state_ptr trace_state) {
|
||||
_cached_file.invalidate_at_most_front(block->offset, trace_state);
|
||||
_cached_file.invalidate_at_most(get_offset_entry_pos(0), get_offset_entry_pos(block->index), trace_state);
|
||||
erase_range(_blocks.begin(), _blocks.lower_bound(block->index));
|
||||
}
|
||||
|
||||
@@ -438,16 +442,26 @@ private:
|
||||
}
|
||||
public:
|
||||
bsearch_clustered_cursor(const schema& s,
|
||||
uint64_t promoted_index_start,
|
||||
uint64_t promoted_index_size,
|
||||
cached_promoted_index::metrics& metrics,
|
||||
reader_permit permit,
|
||||
column_values_fixed_lengths cvfl,
|
||||
cached_file f,
|
||||
cached_file& f,
|
||||
io_priority_class pc,
|
||||
pi_index_type blocks_count,
|
||||
tracing::trace_state_ptr trace_state)
|
||||
: _s(s)
|
||||
, _blocks_count(blocks_count)
|
||||
, _promoted_index(s, metrics, std::move(permit), std::move(cvfl), std::move(f), pc, blocks_count)
|
||||
, _promoted_index(s,
|
||||
promoted_index_start,
|
||||
promoted_index_size,
|
||||
metrics,
|
||||
std::move(permit),
|
||||
std::move(cvfl),
|
||||
f,
|
||||
pc,
|
||||
blocks_count)
|
||||
, _trace_state(std::move(trace_state))
|
||||
{ }
|
||||
|
||||
|
||||
@@ -85,6 +85,7 @@
|
||||
#include "kl/reader.hh"
|
||||
#include "mx/reader.hh"
|
||||
#include "utils/bit_cast.hh"
|
||||
#include "utils/cached_file.hh"
|
||||
|
||||
thread_local disk_error_signal_type sstable_read_error;
|
||||
thread_local disk_error_signal_type sstable_write_error;
|
||||
@@ -1343,6 +1344,13 @@ future<> sstable::update_info_for_opened_data() {
|
||||
}).then([this] {
|
||||
return _index_file.size().then([this] (auto size) {
|
||||
_index_file_size = size;
|
||||
assert(!_cached_index_file);
|
||||
_cached_index_file = seastar::make_shared<cached_file>(_index_file,
|
||||
index_page_cache_metrics, 0,
|
||||
_manager.get_cache_tracker().get_lru(),
|
||||
_manager.get_cache_tracker().region(),
|
||||
_index_file_size);
|
||||
_index_file = make_cached_seastar_file(*_cached_index_file);
|
||||
});
|
||||
}).then([this] {
|
||||
if (this->has_component(component_type::Filter)) {
|
||||
@@ -2849,6 +2857,8 @@ future<> init_metrics() {
|
||||
sm::description("Total number of index page cache pages which were inserted into the cache")),
|
||||
sm::make_gauge("index_page_cache_bytes", [] { return index_page_cache_metrics.cached_bytes; },
|
||||
sm::description("Total number of bytes cached in the index page cache")),
|
||||
sm::make_gauge("index_page_cache_bytes_in_std", [] { return index_page_cache_metrics.bytes_in_std; },
|
||||
sm::description("Total number of bytes in temporary buffers which live in the std allocator")),
|
||||
|
||||
sm::make_derive("pi_cache_hits_l0", [] { return promoted_index_cache_metrics.hits_l0; },
|
||||
sm::description("Number of requests for promoted index block in state l0 which didn't have to go to the page cache")),
|
||||
|
||||
@@ -68,6 +68,7 @@
|
||||
|
||||
class sstable_assertions;
|
||||
class flat_mutation_reader;
|
||||
class cached_file;
|
||||
|
||||
namespace sstables {
|
||||
|
||||
@@ -457,6 +458,7 @@ private:
|
||||
// it is then used to generate the ancestors metadata in the statistics or scylla components.
|
||||
std::set<int> _compaction_ancestors;
|
||||
file _index_file;
|
||||
seastar::shared_ptr<cached_file> _cached_index_file;
|
||||
file _data_file;
|
||||
uint64_t _data_file_size;
|
||||
uint64_t _index_file_size;
|
||||
@@ -815,6 +817,7 @@ public:
|
||||
friend class sstable_writer;
|
||||
friend class mc::writer;
|
||||
friend class index_reader;
|
||||
friend class promoted_index;
|
||||
friend class compaction;
|
||||
friend class sstables_manager;
|
||||
template <typename DataConsumeRowsContext>
|
||||
|
||||
@@ -22,6 +22,7 @@
|
||||
#include <seastar/testing/test_case.hh>
|
||||
#include <seastar/testing/thread_test_case.hh>
|
||||
#include <seastar/core/iostream.hh>
|
||||
#include <seastar/core/when_all.hh>
|
||||
#include <seastar/core/fstream.hh>
|
||||
#include <seastar/core/file.hh>
|
||||
#include <seastar/core/reactor.hh>
|
||||
@@ -36,6 +37,8 @@
|
||||
|
||||
using namespace seastar;
|
||||
|
||||
static lru cf_lru;
|
||||
|
||||
static sstring read_to_string(cached_file::stream& s, size_t limit = std::numeric_limits<size_t>::max()) {
|
||||
sstring b;
|
||||
while (auto buf = s.next().get0()) {
|
||||
@@ -96,7 +99,8 @@ SEASTAR_THREAD_TEST_CASE(test_file_wrapper) {
|
||||
auto page_size = cached_file::page_size;
|
||||
cached_file::metrics metrics;
|
||||
test_file tf = make_test_file(page_size * 3);
|
||||
cached_file cf(tf.f, metrics, 0, page_size * 3);
|
||||
logalloc::region region;
|
||||
cached_file cf(tf.f, metrics, 0, cf_lru, region, page_size * 3);
|
||||
seastar::file f = make_cached_seastar_file(cf);
|
||||
|
||||
BOOST_REQUIRE_EQUAL(tf.contents.substr(0, 1),
|
||||
@@ -114,17 +118,42 @@ SEASTAR_THREAD_TEST_CASE(test_file_wrapper) {
|
||||
BOOST_CHECK_THROW(read_to_string(f, 0, cf.size() + 1), seastar::file::eof_error);
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_concurrent_population) {
|
||||
auto page_size = cached_file::page_size;
|
||||
cached_file::metrics metrics;
|
||||
test_file tf = make_test_file(page_size * 3);
|
||||
logalloc::region region;
|
||||
cached_file cf(tf.f, metrics, 0, cf_lru, region, page_size * 3);
|
||||
seastar::file f = make_cached_seastar_file(cf);
|
||||
|
||||
seastar::when_all(
|
||||
seastar::async([&] {
|
||||
BOOST_REQUIRE_EQUAL(tf.contents.substr(0, 1), read_to_string(f, 0, 1));
|
||||
}),
|
||||
seastar::async([&] {
|
||||
BOOST_REQUIRE_EQUAL(tf.contents.substr(0, 1), read_to_string(f, 0, 1));
|
||||
})
|
||||
).get();
|
||||
|
||||
BOOST_REQUIRE_EQUAL(page_size, metrics.cached_bytes);
|
||||
BOOST_REQUIRE_EQUAL(2, metrics.page_misses);
|
||||
BOOST_REQUIRE_EQUAL(0, metrics.page_evictions);
|
||||
BOOST_REQUIRE_EQUAL(0, metrics.page_hits);
|
||||
BOOST_REQUIRE_EQUAL(1, metrics.page_populations);
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_reading_from_small_file) {
|
||||
test_file tf = make_test_file(1024);
|
||||
|
||||
{
|
||||
cached_file::metrics metrics;
|
||||
cached_file cf(tf.f, metrics, 0, tf.contents.size());
|
||||
logalloc::region region;
|
||||
cached_file cf(tf.f, metrics, 0, cf_lru, region, tf.contents.size());
|
||||
|
||||
{
|
||||
BOOST_REQUIRE_EQUAL(tf.contents, read_to_string(cf, 0));
|
||||
|
||||
BOOST_REQUIRE_EQUAL(1024, metrics.cached_bytes);
|
||||
BOOST_REQUIRE_EQUAL(cached_file::page_size, metrics.cached_bytes);
|
||||
BOOST_REQUIRE_EQUAL(1, metrics.page_misses);
|
||||
BOOST_REQUIRE_EQUAL(0, metrics.page_evictions);
|
||||
BOOST_REQUIRE_EQUAL(0, metrics.page_hits);
|
||||
@@ -134,7 +163,7 @@ SEASTAR_THREAD_TEST_CASE(test_reading_from_small_file) {
|
||||
{
|
||||
BOOST_REQUIRE_EQUAL(tf.contents.substr(2), read_to_string(cf, 2));
|
||||
|
||||
BOOST_REQUIRE_EQUAL(1024, metrics.cached_bytes);
|
||||
BOOST_REQUIRE_EQUAL(cached_file::page_size, metrics.cached_bytes);
|
||||
BOOST_REQUIRE_EQUAL(1, metrics.page_misses);
|
||||
BOOST_REQUIRE_EQUAL(0, metrics.page_evictions);
|
||||
BOOST_REQUIRE_EQUAL(1, metrics.page_hits); // change here
|
||||
@@ -145,7 +174,7 @@ SEASTAR_THREAD_TEST_CASE(test_reading_from_small_file) {
|
||||
BOOST_REQUIRE_EQUAL(sstring(), read_to_string(cf, 3000));
|
||||
|
||||
// no change
|
||||
BOOST_REQUIRE_EQUAL(1024, metrics.cached_bytes);
|
||||
BOOST_REQUIRE_EQUAL(cached_file::page_size, metrics.cached_bytes);
|
||||
BOOST_REQUIRE_EQUAL(1, metrics.page_misses);
|
||||
BOOST_REQUIRE_EQUAL(0, metrics.page_evictions);
|
||||
BOOST_REQUIRE_EQUAL(1, metrics.page_hits);
|
||||
@@ -156,7 +185,8 @@ SEASTAR_THREAD_TEST_CASE(test_reading_from_small_file) {
|
||||
{
|
||||
size_t off = 100;
|
||||
cached_file::metrics metrics;
|
||||
cached_file cf(tf.f, metrics, off, tf.contents.size() - off);
|
||||
logalloc::region region;
|
||||
cached_file cf(tf.f, metrics, off, cf_lru, region, tf.contents.size() - off);
|
||||
|
||||
BOOST_REQUIRE_EQUAL(tf.contents.substr(off), read_to_string(cf, 0));
|
||||
BOOST_REQUIRE_EQUAL(tf.contents.substr(off + 2), read_to_string(cf, 2));
|
||||
@@ -164,12 +194,86 @@ SEASTAR_THREAD_TEST_CASE(test_reading_from_small_file) {
|
||||
}
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_eviction_via_lru) {
|
||||
auto page = cached_file::page_size;
|
||||
auto file_size = page * 2 + 12;
|
||||
test_file tf = make_test_file(file_size);
|
||||
|
||||
{
|
||||
cached_file::metrics metrics;
|
||||
logalloc::region region;
|
||||
cached_file cf(tf.f, metrics, 0, cf_lru, region, tf.contents.size());
|
||||
|
||||
{
|
||||
BOOST_REQUIRE_EQUAL(tf.contents, read_to_string(cf, 0));
|
||||
|
||||
BOOST_REQUIRE_EQUAL(page * 3, metrics.cached_bytes);
|
||||
BOOST_REQUIRE_EQUAL(page * 3, cf.cached_bytes());
|
||||
BOOST_REQUIRE_EQUAL(3, metrics.page_misses);
|
||||
BOOST_REQUIRE_EQUAL(0, metrics.page_evictions);
|
||||
BOOST_REQUIRE_EQUAL(0, metrics.page_hits);
|
||||
BOOST_REQUIRE_EQUAL(3, metrics.page_populations);
|
||||
}
|
||||
|
||||
{
|
||||
cf_lru.evict_all();
|
||||
|
||||
BOOST_REQUIRE_EQUAL(0, metrics.cached_bytes); // change here
|
||||
BOOST_REQUIRE_EQUAL(0, cf.cached_bytes()); // change here
|
||||
BOOST_REQUIRE_EQUAL(3, metrics.page_misses);
|
||||
BOOST_REQUIRE_EQUAL(3, metrics.page_evictions); // change here
|
||||
BOOST_REQUIRE_EQUAL(0, metrics.page_hits);
|
||||
BOOST_REQUIRE_EQUAL(3, metrics.page_populations);
|
||||
}
|
||||
|
||||
{
|
||||
BOOST_REQUIRE_EQUAL(tf.contents, read_to_string(cf, 0));
|
||||
|
||||
BOOST_REQUIRE_EQUAL(page * 3, metrics.cached_bytes); // change here
|
||||
BOOST_REQUIRE_EQUAL(page * 3, cf.cached_bytes());
|
||||
BOOST_REQUIRE_EQUAL(6, metrics.page_misses); // change here
|
||||
BOOST_REQUIRE_EQUAL(3, metrics.page_evictions);
|
||||
BOOST_REQUIRE_EQUAL(0, metrics.page_hits);
|
||||
BOOST_REQUIRE_EQUAL(6, metrics.page_populations); // change here
|
||||
}
|
||||
|
||||
{
|
||||
// Test that the page which is touched is evicted last
|
||||
BOOST_REQUIRE_EQUAL(tf.contents.substr(page, 1), read_to_string(cf, page, 1)); // hit page 1
|
||||
|
||||
BOOST_REQUIRE_EQUAL(6, metrics.page_misses);
|
||||
BOOST_REQUIRE_EQUAL(3, metrics.page_evictions);
|
||||
BOOST_REQUIRE_EQUAL(1, metrics.page_hits);
|
||||
BOOST_REQUIRE_EQUAL(6, metrics.page_populations);
|
||||
|
||||
cf_lru.evict();
|
||||
|
||||
BOOST_REQUIRE_EQUAL(tf.contents.substr(page, 1), read_to_string(cf, page, 1)); // hit page 1
|
||||
|
||||
BOOST_REQUIRE_EQUAL(6, metrics.page_misses);
|
||||
BOOST_REQUIRE_EQUAL(4, metrics.page_evictions); // change
|
||||
BOOST_REQUIRE_EQUAL(2, metrics.page_hits); // change
|
||||
BOOST_REQUIRE_EQUAL(6, metrics.page_populations);
|
||||
|
||||
cf_lru.evict();
|
||||
|
||||
BOOST_REQUIRE_EQUAL(tf.contents.substr(page, 1), read_to_string(cf, page, 1)); // hit page 1
|
||||
|
||||
BOOST_REQUIRE_EQUAL(6, metrics.page_misses);
|
||||
BOOST_REQUIRE_EQUAL(5, metrics.page_evictions); // change
|
||||
BOOST_REQUIRE_EQUAL(3, metrics.page_hits); // change
|
||||
BOOST_REQUIRE_EQUAL(6, metrics.page_populations);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_invalidation) {
|
||||
auto page_size = cached_file::page_size;
|
||||
test_file tf = make_test_file(page_size * 2);
|
||||
|
||||
cached_file::metrics metrics;
|
||||
cached_file cf(tf.f, metrics, 0, page_size * 2);
|
||||
logalloc::region region;
|
||||
cached_file cf(tf.f, metrics, 0, cf_lru, region, page_size * 2);
|
||||
|
||||
// Reads one page, half of the first page and half of the second page.
|
||||
auto read = [&] {
|
||||
@@ -260,7 +364,8 @@ SEASTAR_THREAD_TEST_CASE(test_invalidation_skewed_cached_file) {
|
||||
|
||||
size_t offset = page_size / 2;
|
||||
cached_file::metrics metrics;
|
||||
cached_file cf(tf.f, metrics, offset, page_size * 2);
|
||||
logalloc::region region;
|
||||
cached_file cf(tf.f, metrics, offset, cf_lru, region, page_size * 2);
|
||||
|
||||
// Reads one page, half of the first page and half of the second page.
|
||||
auto read = [&] {
|
||||
|
||||
@@ -24,6 +24,7 @@
|
||||
#include "reader_permit.hh"
|
||||
#include "utils/div_ceil.hh"
|
||||
#include "utils/bptree.hh"
|
||||
#include "utils/lru.hh"
|
||||
#include "tracing/trace_state.hh"
|
||||
|
||||
#include <seastar/core/file.hh>
|
||||
@@ -35,7 +36,7 @@ using namespace seastar;
|
||||
/// \brief A read-through cache of a file.
|
||||
///
|
||||
/// Caches contents with page granularity (4 KiB).
|
||||
/// Cached pages are evicted manually using the invalidate_*() method family, or when the object is destroyed.
|
||||
/// Cached pages are evicted by the LRU or manually using the invalidate_*() method family, or when the object is destroyed.
|
||||
///
|
||||
/// Concurrent reading is allowed.
|
||||
///
|
||||
@@ -68,19 +69,78 @@ public:
|
||||
uint64_t page_evictions = 0;
|
||||
uint64_t page_populations = 0;
|
||||
uint64_t cached_bytes = 0;
|
||||
uint64_t bytes_in_std = 0; // memory used by active temporary_buffer:s
|
||||
};
|
||||
private:
|
||||
class cached_page {
|
||||
class cached_page : public evictable {
|
||||
public:
|
||||
cached_file* parent;
|
||||
page_idx_type idx;
|
||||
temporary_buffer<char> buf;
|
||||
logalloc::lsa_buffer _lsa_buf;
|
||||
temporary_buffer<char> _buf; // Empty when not shared. May mirror _lsa_buf when shared.
|
||||
size_t _use_count = 0;
|
||||
public:
|
||||
struct cached_page_del {
|
||||
void operator()(cached_page* cp) {
|
||||
if (--cp->_use_count == 0) {
|
||||
cp->parent->_metrics.bytes_in_std -= cp->_buf.size();
|
||||
cp->_buf = {};
|
||||
cp->parent->_lru.add(*cp);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
using ptr_type = std::unique_ptr<cached_page, cached_page_del>;
|
||||
|
||||
// As long as any ptr_type is alive, this cached_page will not be destroyed
|
||||
// because it will not be linked in the LRU.
|
||||
ptr_type share() noexcept {
|
||||
if (_use_count++ == 0) {
|
||||
unlink_from_lru();
|
||||
}
|
||||
return std::unique_ptr<cached_page, cached_page_del>(this);
|
||||
}
|
||||
public:
|
||||
explicit cached_page(cached_file* parent, page_idx_type idx, temporary_buffer<char> buf)
|
||||
: parent(parent)
|
||||
, idx(idx)
|
||||
, buf(std::move(buf))
|
||||
{ }
|
||||
cached_page(cached_page&&) noexcept = default;
|
||||
, _buf(std::move(buf))
|
||||
{
|
||||
_lsa_buf = parent->_region.alloc_buf(_buf.size());
|
||||
parent->_metrics.bytes_in_std += _buf.size();
|
||||
std::copy(_buf.begin(), _buf.end(), _lsa_buf.get());
|
||||
}
|
||||
|
||||
cached_page(cached_page&&) noexcept {
|
||||
// The move constructor is required by allocation_strategy::construct() due to generic bplus::tree,
|
||||
// but the object is always allocated in the standard allocator context so never actually moved.
|
||||
// We cannot properly implement the move constructor because "this" is captured in various places.
|
||||
abort();
|
||||
}
|
||||
|
||||
~cached_page() {
|
||||
assert(!_use_count);
|
||||
}
|
||||
|
||||
void on_evicted() noexcept override;
|
||||
|
||||
temporary_buffer<char> get_buf() {
|
||||
auto self = share();
|
||||
if (!_buf) {
|
||||
_buf = temporary_buffer<char>(_lsa_buf.size());
|
||||
parent->_metrics.bytes_in_std += _lsa_buf.size();
|
||||
std::copy(_lsa_buf.get(), _lsa_buf.get() + _lsa_buf.size(), _buf.get_write());
|
||||
}
|
||||
// Holding to a temporary buffer holds the cached page so that the buffer can be reused by concurrent hits.
|
||||
// Also, sharing cached_page keeps the temporary_buffer's storage alive.
|
||||
return temporary_buffer<char>(_buf.get_write(), _buf.size(), make_deleter([self = std::move(self)] {}));
|
||||
}
|
||||
|
||||
size_t size_in_allocator() {
|
||||
// lsa_buf occupies 4K in LSA even if the buf size is smaller.
|
||||
// _buf is transient and not accounted here.
|
||||
return page_size;
|
||||
}
|
||||
};
|
||||
|
||||
struct page_idx_less_comparator {
|
||||
@@ -92,6 +152,8 @@ private:
|
||||
file _file;
|
||||
sstring _file_name; // for logging / tracing
|
||||
metrics& _metrics;
|
||||
lru& _lru;
|
||||
logalloc::region& _region;
|
||||
|
||||
using cache_type = bplus::tree<page_idx_type, cached_page, page_idx_less_comparator, 12, bplus::key_search::linear>;
|
||||
cache_type _cache;
|
||||
@@ -103,27 +165,37 @@ private:
|
||||
offset_type _last_page_size; // Ignores _start in case the start lies on the same page.
|
||||
page_idx_type _last_page;
|
||||
private:
|
||||
future<temporary_buffer<char>> get_page(page_idx_type idx, const io_priority_class& pc,
|
||||
future<cached_page::ptr_type> get_page_ptr(page_idx_type idx, const io_priority_class& pc,
|
||||
tracing::trace_state_ptr trace_state) {
|
||||
auto i = _cache.lower_bound(idx);
|
||||
if (i != _cache.end() && i->idx == idx) {
|
||||
++_metrics.page_hits;
|
||||
tracing::trace(trace_state, "page cache hit: file={}, page={}", _file_name, idx);
|
||||
cached_page& cp = *i;
|
||||
return make_ready_future<temporary_buffer<char>>(cp.buf.share());
|
||||
return make_ready_future<cached_page::ptr_type>(cp.share());
|
||||
}
|
||||
tracing::trace(trace_state, "page cache miss: file={}, page={}", _file_name, idx);
|
||||
++_metrics.page_misses;
|
||||
auto size = idx == _last_page ? _last_page_size : page_size;
|
||||
return _file.dma_read_exactly<char>(idx * page_size, size, pc)
|
||||
.then([this, idx] (temporary_buffer<char>&& buf) mutable {
|
||||
++_metrics.page_populations;
|
||||
_metrics.cached_bytes += buf.size();
|
||||
_cached_bytes += buf.size();
|
||||
_cache.emplace(idx, cached_page(this, idx, buf.share()));
|
||||
return std::move(buf);
|
||||
auto it_and_flag = _cache.emplace(idx, this, idx, std::move(buf));
|
||||
cached_page& cp = *it_and_flag.first;
|
||||
if (it_and_flag.second) {
|
||||
++_metrics.page_populations;
|
||||
_metrics.cached_bytes += cp.size_in_allocator();
|
||||
_cached_bytes += cp.size_in_allocator();
|
||||
}
|
||||
return cp.share();
|
||||
});
|
||||
}
|
||||
future<temporary_buffer<char>> get_page(page_idx_type idx,
|
||||
const io_priority_class& pc,
|
||||
tracing::trace_state_ptr trace_state) {
|
||||
return get_page_ptr(idx, pc, std::move(trace_state)).then([] (cached_page::ptr_type cp) {
|
||||
return cp->get_buf();
|
||||
});
|
||||
}
|
||||
public:
|
||||
// Generator of subsequent pages of data reflecting the contents of the file.
|
||||
// Single-user.
|
||||
@@ -175,8 +247,8 @@ public:
|
||||
};
|
||||
|
||||
void on_evicted(cached_page& p) {
|
||||
_metrics.cached_bytes -= p.buf.size();
|
||||
_cached_bytes -= p.buf.size();
|
||||
_metrics.cached_bytes -= p.size_in_allocator();
|
||||
_cached_bytes -= p.size_in_allocator();
|
||||
++_metrics.page_evictions;
|
||||
}
|
||||
|
||||
@@ -184,9 +256,13 @@ public:
|
||||
size_t count = 0;
|
||||
auto disposer = [] (auto* p) noexcept {};
|
||||
while (start != end) {
|
||||
++count;
|
||||
on_evicted(*start);
|
||||
start = start.erase_and_dispose(disposer, page_idx_less_comparator());
|
||||
if (start->is_linked()) {
|
||||
++count;
|
||||
on_evicted(*start);
|
||||
start = start.erase_and_dispose(disposer, page_idx_less_comparator());
|
||||
} else {
|
||||
++start;
|
||||
}
|
||||
}
|
||||
return count;
|
||||
}
|
||||
@@ -198,10 +274,12 @@ public:
|
||||
/// \param m Metrics object which should be updated from operations on this object.
|
||||
/// The metrics object can be shared by many cached_file instances, in which case it
|
||||
/// will reflect the sum of operations on all cached_file instances.
|
||||
cached_file(file f, cached_file::metrics& m, offset_type start, offset_type size, sstring file_name = {})
|
||||
cached_file(file f, cached_file::metrics& m, offset_type start, lru& l, logalloc::region& reg, offset_type size, sstring file_name = {})
|
||||
: _file(std::move(f))
|
||||
, _file_name(std::move(file_name))
|
||||
, _metrics(m)
|
||||
, _lru(l)
|
||||
, _region(reg)
|
||||
, _cache(page_idx_less_comparator())
|
||||
, _start(start)
|
||||
, _size(size)
|
||||
@@ -216,6 +294,7 @@ public:
|
||||
|
||||
~cached_file() {
|
||||
evict_range(_cache.begin(), _cache.end());
|
||||
assert(_cache.empty());
|
||||
}
|
||||
|
||||
/// \brief Populates cache from buf assuming that buf contains the data from the front of the area.
|
||||
@@ -279,7 +358,7 @@ public:
|
||||
/// \brief Read from the file
|
||||
///
|
||||
/// Returns a stream with data which starts at position pos in the area managed by this instance.
|
||||
/// This cached_file instance must outlive the returned stream.
|
||||
/// This cached_file instance must outlive the returned stream and buffers returned by the stream.
|
||||
/// The stream does not do any read-ahead.
|
||||
///
|
||||
/// \param pos The offset of the first byte to read, relative to the cached file area.
|
||||
@@ -311,6 +390,13 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
inline
|
||||
void cached_file::cached_page::on_evicted() noexcept {
|
||||
parent->on_evicted(*this);
|
||||
cached_file::cache_type::iterator it(this);
|
||||
it.erase(page_idx_less_comparator());
|
||||
}
|
||||
|
||||
class cached_file_impl : public file_impl {
|
||||
cached_file& _cf;
|
||||
tracing::trace_state_ptr _trace_state;
|
||||
|
||||
Reference in New Issue
Block a user