Files
scylladb/utils/cached_file.hh
Tomasz Grabiec 082342ecad Attach names to allocating sections for better debuggability
Large reserves in allocating_section can cause stalls. We already log
reserve increase, but we don't know which table it belongs to:

  lsa - LSA allocation failure, increasing reserve in section 0x600009f94590 to 128 segments;

Allocating sections used for updating row cache on memtable flush are
notoriously problematic. Each table has its own row_cache, so its own
allocating_section(s). If we attached table name to those sections, we
could identify which table is causing problems. In some issues we
suspected system.raft, but we can't be sure.

This patch allows naming allocating_sections for the purpose of
identifying them in such log messages. I use abstract_formatter for
this purpose to avoid the cost of formatting strings on the hot path
(e.g. index_reader). And also to avoid duplicating strings which are
already stored elsewhere.

Fixes #25799

Closes scylladb/scylladb#27470
2025-12-07 14:14:25 +02:00

658 lines
26 KiB
C++

/*
* Copyright (C) 2019-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#pragma once
#include "reader_permit.hh"
#include "utils/assert.hh"
#include "utils/div_ceil.hh"
#include "utils/bptree.hh"
#include "utils/logalloc.hh"
#include "utils/lru.hh"
#include "utils/error_injection.hh"
#include "tracing/trace_state.hh"
#include "utils/cached_file_stats.hh"
#include <seastar/core/file.hh>
#include <seastar/coroutine/maybe_yield.hh>
using namespace seastar;
/// \brief A read-through cache of a file.
///
/// Caches contents with page granularity (4 KiB).
/// Cached pages are evicted by the LRU or manually using the invalidate_*() method family, or when the object is destroyed.
///
/// Concurrent reading is allowed.
///
/// The object is movable but this is only allowed before readers are created.
///
class cached_file {
public:
// Must be aligned to _file.disk_read_dma_alignment(). 4K is always safe.
static constexpr size_t page_size = 4096;
// The content of the underlying file (_file) is divided into pages
// of equal size (page_size). This type is used to identify pages.
// Pages are assigned consecutive identifiers starting from 0.
using page_idx_type = uint64_t;
// Represents the number of pages in a sequence
using page_count_type = uint64_t;
using offset_type = uint64_t;
private:
class cached_page final : public index_evictable {
public:
cached_file* parent;
page_idx_type idx;
logalloc::lsa_buffer _lsa_buf;
temporary_buffer<char> _buf; // Empty when not shared. May mirror _lsa_buf when shared.
size_t _use_count = 0;
public:
struct cached_page_del {
void operator()(cached_page* cp) {
if (--cp->_use_count == 0) {
cp->parent->_metrics.bytes_in_std -= cp->_buf.size();
cp->_buf = {};
cp->parent->_lru.add(*cp);
}
}
};
using ptr_type = std::unique_ptr<cached_page, cached_page_del>;
// As long as any ptr_type is alive, this cached_page will not be destroyed
// because it will not be linked in the LRU.
ptr_type share() noexcept {
if (_use_count++ == 0) {
if (is_linked()) {
parent->_lru.remove(*this);
}
}
return std::unique_ptr<cached_page, cached_page_del>(this);
}
bool only_ref() const {
return _use_count <= 1;
}
public:
explicit cached_page(cached_file* parent, page_idx_type idx, temporary_buffer<char> buf)
: parent(parent)
, idx(idx)
, _buf(std::move(buf))
{
_lsa_buf = parent->_region.alloc_buf(_buf.size());
parent->_metrics.bytes_in_std += _buf.size();
std::copy(_buf.begin(), _buf.end(), _lsa_buf.get());
}
cached_page(cached_page&&) noexcept {
// The move constructor is required by allocation_strategy::construct() due to generic bplus::tree,
// but the object is always allocated in the standard allocator context so never actually moved.
// We cannot properly implement the move constructor because "this" is captured in various places.
abort();
}
~cached_page() {
SCYLLA_ASSERT(!_use_count);
}
size_t pos() const {
return idx * page_size;
}
void on_evicted() noexcept override;
temporary_buffer<char> get_buf() {
auto self = share();
if (!_buf) {
_buf = temporary_buffer<char>(_lsa_buf.size());
parent->_metrics.bytes_in_std += _lsa_buf.size();
std::copy(_lsa_buf.get(), _lsa_buf.get() + _lsa_buf.size(), _buf.get_write());
}
// Holding to a temporary buffer holds the cached page so that the buffer can be reused by concurrent hits.
// Also, sharing cached_page keeps the temporary_buffer's storage alive.
return temporary_buffer<char>(_buf.get_write(), _buf.size(), make_deleter([self = std::move(self)] {}));
}
// Returns a pointer to the contents of the page.
// The buffer is invalidated when the page is evicted or when the owning LSA region invalidates references.
char* begin() {
return _lsa_buf.get();
}
// Returns a pointer to the contents of the page.
// The buffer can be invalidated when the page is evicted or when the owning LSA region invalidates references.
std::span<const std::byte> get_view() const {
return std::as_bytes(std::span<const char>(_lsa_buf.get(), _lsa_buf.size()));
}
size_t size_in_allocator() {
// lsa_buf occupies 4K in LSA even if the buf size is smaller.
// _buf is transient and not accounted here.
return page_size;
}
};
struct page_idx_less_comparator {
bool operator()(page_idx_type lhs, page_idx_type rhs) const noexcept {
return lhs < rhs;
}
};
file _file;
sstring _file_name; // for logging / tracing
cached_file_stats& _metrics;
lru& _lru;
logalloc::region& _region;
logalloc::allocating_section _as;
using cache_type = bplus::tree<page_idx_type, cached_page, page_idx_less_comparator, 12, bplus::key_search::linear>;
cache_type _cache;
const offset_type _size;
offset_type _cached_bytes = 0;
offset_type _last_page_size;
page_idx_type _last_page;
public:
using ptr_type = cached_page::ptr_type;
struct page_read_result {
ptr_type ptr;
bool was_already_cached;
page_read_result(ptr_type p, bool cached)
: ptr(std::move(p))
, was_already_cached(cached)
{}
};
future<page_read_result> get_shared_page(size_t global_pos, tracing::trace_state_ptr trace_state) {
return get_page_ptr(global_pos / page_size, 1, trace_state);
}
future<page_read_result> get_shared_page(size_t global_pos, reader_permit permit, tracing::trace_state_ptr trace_state) {
return get_page_ptr(global_pos / page_size, 1, trace_state, permit);
}
private:
future<page_read_result> get_page_ptr(page_idx_type idx,
page_count_type read_ahead,
tracing::trace_state_ptr trace_state,
std::optional<reader_permit> permit = {}) {
auto i = _cache.lower_bound(idx);
if (i != _cache.end() && i->idx == idx) {
++_metrics.page_hits;
tracing::trace(trace_state, "page cache hit: file={}, page={}", _file_name, idx);
cached_page& cp = *i;
return make_ready_future<page_read_result>(cp.share(), true);
}
tracing::trace(trace_state, "page cache miss: file={}, page={}, readahead={}", _file_name, idx, read_ahead);
++_metrics.page_misses;
size_t size = (idx + read_ahead) > _last_page
? (_last_page_size + (_last_page - idx) * page_size)
: read_ahead * page_size;
std::optional<reader_permit::resource_units> units;
std::optional<reader_permit::awaits_guard> await_guard;
if (permit) {
units = permit->consume_memory(size);
await_guard.emplace(*permit);
}
return _file.dma_read_exactly<char>(idx * page_size, size)
.then([this, ag = std::move(await_guard), units = std::move(units), idx] (temporary_buffer<char>&& buf) mutable {
cached_page::ptr_type first_page;
while (buf.size()) {
auto this_size = std::min(page_size, buf.size());
// _cache.emplace() needs to run under allocating section even though it lives in the std space
// because bplus::tree operations are not reentrant, so we need to prevent memory reclamation.
auto [cp, missed] = _as(_region, [&] {
auto this_buf = buf.share();
this_buf.trim(this_size);
return _cache.emplace(idx, this, idx, std::move(this_buf));
});
buf.trim_front(this_size);
++idx;
if (missed) {
++_metrics.page_populations;
_metrics.cached_bytes += cp->size_in_allocator();
_cached_bytes += cp->size_in_allocator();
}
// pages read ahead will be placed into LRU, as there's no guarantee they will be fetched later.
cached_page::ptr_type ptr = cp->share();
if (!first_page) {
first_page = std::move(ptr);
}
}
utils::get_local_injector().inject("cached_file_get_first_page", []() {
throw std::bad_alloc();
});
return page_read_result{std::move(first_page), false};
});
}
// Returns (page, true) if the page was cached, and (page, false) if the page was uncached.
future<std::pair<temporary_buffer<char>, bool>> get_page(page_idx_type idx,
page_count_type count,
tracing::trace_state_ptr trace_state,
std::optional<reader_permit> permit = {}) {
return get_page_ptr(idx, count, std::move(trace_state), permit).then([permit] (page_read_result read_result) mutable {
auto buf = read_result.ptr->get_buf();
if (permit) {
auto units = permit->consume_memory(buf.size());
buf = temporary_buffer<char>(buf.get_write(), buf.size(),
make_object_deleter(buf.release(), std::move(units)));
}
return std::make_pair(std::move(buf), read_result.was_already_cached);
});
}
public:
class page_view {
cached_page::ptr_type _page;
size_t _offset;
size_t _size = 0;
std::optional<reader_permit::resource_units> _units;
public:
page_view() = default;
page_view(size_t offset, size_t size, cached_page::ptr_type page, std::optional<reader_permit::resource_units> units)
: _page(std::move(page))
, _offset(offset)
, _size(size)
, _units(std::move(units))
{}
page_view(page_view&& o) noexcept
: _page(std::move(o._page))
, _offset(std::exchange(o._offset, 0))
, _size(std::exchange(o._size, 0))
, _units(std::move(o._units))
{}
page_view& operator=(page_view&& o) noexcept {
_page = std::move(o._page);
_offset = std::exchange(o._offset, 0);
_size = std::exchange(o._size, 0);
_units = std::move(o._units);
return *this;
}
// Fills the page with garbage, releases the pointer and evicts the page so that it's no longer in cache.
// For testing use-after-free on the buffer space.
// After the call, the object is the same state as after being moved-from.
void release_and_scramble() noexcept {
if (_page->only_ref()) {
std::memset(_page->_lsa_buf.get(), 0xfe, _page->_lsa_buf.size());
cached_page& cp = *_page;
_page = nullptr;
cp.parent->_lru.remove(cp);
cp.on_evicted();
} else {
_page = nullptr;
}
_size = 0;
_offset = 0;
_units = std::nullopt;
}
operator bool() const { return bool(_page) && _size; }
public: // ContiguousSharedBuffer concept
const char* begin() const { return _page ? _page->begin() + _offset : nullptr; }
const char* get() const { return begin(); }
const char* end() const { return begin() + _size; }
size_t size() const { return _size; }
bool empty() const { return !_size; }
char* get_write() { return const_cast<char*>(begin()); }
void trim(size_t pos) {
_size = pos;
}
void trim_front(size_t n) {
_offset += n;
_size -= n;
}
page_view share() {
return share(0, _size);
}
page_view share(size_t pos, size_t len) {
return page_view(_offset + pos, len, _page->share(), {});
}
};
// Generator of subsequent pages of data reflecting the contents of the file.
// Single-user.
class stream {
cached_file* _cached_file;
std::optional<reader_permit> _permit;
page_idx_type _page_idx;
offset_type _offset_in_page;
offset_type _size_hint;
tracing::trace_state_ptr _trace_state;
private:
std::optional<reader_permit::resource_units> get_page_units(size_t size = page_size) {
return _permit
? std::make_optional(_permit->consume_memory(size))
: std::nullopt;
}
void shrink_size_hint(bool page_was_cached) {
if (page_was_cached) {
// If the page was cached, shrink the _size_hint by page_size,
// but don't reduce it below page_size.
_size_hint = std::max(_size_hint, 2 * page_size) - page_size;
} else {
// If the page was uncached, then get_page read the entire _size_hint bytes from disk,
// (in one I/O operation) and inserted the read pages into the cache.
// We will most likely serve the remainder of the stream from them.
//
// But if some of those pages happen to be evicted before we complete the read
// (this shouldn't really happen in practice, because in practice stay in cache
// for much, much longer than any read takes, but still), we don't want to read
// something on the order of _size_hint again, as that could result, in theory,
// in a quadratic amount of work.
//
// So in the very unlikely chance that we will have to re-read something from disk,
// let's do it page-by-page.
_size_hint = page_size;
}
}
public:
// Creates an empty stream.
stream()
: _cached_file(nullptr)
{ }
stream(cached_file& cf, std::optional<reader_permit> permit, tracing::trace_state_ptr trace_state,
page_idx_type start_page, offset_type start_offset_in_page, offset_type size_hint)
: _cached_file(&cf)
, _permit(std::move(permit))
, _page_idx(start_page)
, _offset_in_page(start_offset_in_page)
, _size_hint(size_hint)
, _trace_state(std::move(trace_state))
{ }
// Yields the next chunk of data.
// Returns empty buffer when end-of-stream is reached.
// Calls must be serialized.
// This instance must be kept alive until the returned future resolves.
future<temporary_buffer<char>> next() {
if (!_cached_file || _page_idx > _cached_file->_last_page) {
return make_ready_future<temporary_buffer<char>>(temporary_buffer<char>());
}
page_count_type readahead = div_ceil(_size_hint, page_size);
return _cached_file->get_page(_page_idx, readahead, _trace_state, _permit).then(
[this] (std::pair<temporary_buffer<char>, bool> read_result) mutable {
auto page = std::move(read_result.first);
if (_page_idx == _cached_file->_last_page) {
page.trim(_cached_file->_last_page_size);
}
page.trim_front(_offset_in_page);
_offset_in_page = 0;
++_page_idx;
shrink_size_hint(read_result.second);
return page;
});
}
// Yields the next chunk of data.
// Returns empty buffer when end-of-stream is reached.
// Calls must be serialized.
// This instance must be kept alive until the returned future resolves.
future<page_view> next_page_view() {
if (!_cached_file || _page_idx > _cached_file->_last_page) {
return make_ready_future<page_view>(page_view());
}
page_count_type readahead = div_ceil(_size_hint, page_size);
return _cached_file->get_page_ptr(_page_idx, readahead, _trace_state, _permit).then(
[this] (page_read_result read_result) mutable {
auto page = std::move(read_result.ptr);
size_t size = _page_idx == _cached_file->_last_page
? _cached_file->_last_page_size
: page_size;
page_view buf(_offset_in_page, size - _offset_in_page, std::move(page), get_page_units());
_offset_in_page = 0;
++_page_idx;
shrink_size_hint(read_result.was_already_cached);
return buf;
});
}
};
void on_evicted(cached_page& p) {
_metrics.cached_bytes -= p.size_in_allocator();
_cached_bytes -= p.size_in_allocator();
++_metrics.page_evictions;
}
size_t evict_range(cache_type::iterator start, cache_type::iterator end) noexcept {
return with_allocator(standard_allocator(), [&] {
size_t count = 0;
auto disposer = [] (auto* p) noexcept {};
while (start != end) {
if (start->is_linked()) {
++count;
_lru.remove(*start);
on_evicted(*start);
start = start.erase_and_dispose(disposer, page_idx_less_comparator());
} else {
++start;
}
}
return count;
});
}
public:
/// \brief Constructs a cached_file.
///
/// The cached area will reflect subset of f from the byte range [start, start + size).
///
/// \param m Metrics object which should be updated from operations on this object.
/// The metrics object can be shared by many cached_file instances, in which case it
/// will reflect the sum of operations on all cached_file instances.
cached_file(file f, cached_file_stats& m, lru& l, logalloc::region& reg, offset_type size, sstring file_name = {})
: _file(std::move(f))
, _file_name(std::move(file_name))
, _metrics(m)
, _lru(l)
, _region(reg)
, _as(abstract_formatter([this] (fmt::format_context& ctx) {
fmt::format_to(ctx.out(), "cached_file {}", _file_name);
}))
, _cache(page_idx_less_comparator())
, _size(size)
{
offset_type last_byte_offset = _size ? (_size - 1) : 0;
_last_page_size = (last_byte_offset % page_size) + (_size ? 1 : 0);
_last_page = last_byte_offset / page_size;
}
cached_file(cached_file&&) = delete; // captured this
cached_file(const cached_file&) = delete;
~cached_file() {
evict_range(_cache.begin(), _cache.end());
SCYLLA_ASSERT(_cache.empty());
}
/// \brief Invalidates [start, end) or less.
///
/// Invariants:
///
/// - all bytes outside [start, end) which were cached before the call will still be cached.
///
void invalidate_at_most(offset_type start, offset_type end, tracing::trace_state_ptr trace_state = {}) {
auto lo_page = start / page_size
// If start is 0 then we can drop the containing page
// Otherwise we cannot drop the page.
+ bool(start % page_size) * bool(start != 0);
auto hi_page = (end) / page_size;
if (lo_page < hi_page) {
auto count = evict_range(_cache.lower_bound(lo_page), _cache.lower_bound(hi_page));
if (count) {
tracing::trace(trace_state, "page cache: evicted {} page(s) in [{}, {}), file={}", count,
lo_page, hi_page, _file_name);
}
}
}
/// \brief Equivalent to \ref invalidate_at_most(0, end).
void invalidate_at_most_front(offset_type end, tracing::trace_state_ptr trace_state = {}) {
auto hi_page = end / page_size;
auto count = evict_range(_cache.begin(), _cache.lower_bound(hi_page));
if (count) {
tracing::trace(trace_state, "page cache: evicted {} page(s) in [0, {}), file={}", count,
hi_page, _file_name);
}
}
/// \brief Read from the file
///
/// Returns a stream with data which starts at position pos in the area managed by this instance.
/// This cached_file instance must outlive the returned stream and buffers returned by the stream.
/// The stream does not do any read-ahead.
///
/// \param pos The offset of the first byte to read, relative to the cached file area.
/// \param permit Holds reader_permit under which returned buffers should be accounted.
/// When disengaged, no accounting is done.
stream read(offset_type global_pos, std::optional<reader_permit> permit,
tracing::trace_state_ptr trace_state = {},
size_t size_hint = page_size) {
if (global_pos >= _size) {
return stream();
}
auto offset = global_pos % page_size;
auto page_idx = global_pos / page_size;
return stream(*this, std::move(permit), std::move(trace_state), page_idx, offset, size_hint);
}
/// \brief Returns the number of bytes in the area managed by this instance.
offset_type size() const {
return _size;
}
/// \brief Returns the number of bytes cached.
size_t cached_bytes() const {
return _cached_bytes;
}
/// \brief Returns the underlying file.
file& get_file() {
return _file;
}
logalloc::region& region() {
return _region;
}
// Evicts all unused pages.
// Pages which are used are not removed.
future<> evict_gently() {
auto i = _cache.begin();
while (i != _cache.end()) {
if (i->is_linked()) {
_lru.remove(*i);
on_evicted(*i);
i = i.erase(page_idx_less_comparator());
} else {
++i;
}
if (need_preempt() && i != _cache.end()) {
auto key = i->idx;
co_await coroutine::maybe_yield();
i = _cache.lower_bound(key);
}
}
}
};
inline
void cached_file::cached_page::on_evicted() noexcept {
parent->on_evicted(*this);
with_allocator(standard_allocator(), [this] {
cached_file::cache_type::iterator it(this);
it.erase(page_idx_less_comparator());
});
}
class cached_file_impl : public file_impl {
cached_file& _cf;
tracing::trace_state_ptr _trace_state;
private:
[[noreturn]] void unsupported() {
throw_with_backtrace<std::logic_error>("unsupported operation");
}
public:
cached_file_impl(cached_file& cf, tracing::trace_state_ptr trace_state = {})
: file_impl(*get_file_impl(cf.get_file()))
, _cf(cf)
, _trace_state(std::move(trace_state))
{ }
// unsupported
virtual future<size_t> write_dma(uint64_t pos, const void* buffer, size_t len, io_intent*) override { unsupported(); }
virtual future<size_t> write_dma(uint64_t pos, std::vector<iovec> iov, io_intent*) override { unsupported(); }
virtual future<> flush(void) override { unsupported(); }
virtual future<> truncate(uint64_t length) override { unsupported(); }
virtual future<> discard(uint64_t offset, uint64_t length) override { unsupported(); }
virtual future<> allocate(uint64_t position, uint64_t length) override { unsupported(); }
virtual subscription<directory_entry> list_directory(std::function<future<>(directory_entry)>) override { unsupported(); }
// delegating
virtual future<struct stat> stat(void) override { return _cf.get_file().stat(); }
virtual future<uint64_t> size(void) override { return _cf.get_file().size(); }
virtual future<> close() override { return _cf.get_file().close(); }
virtual std::unique_ptr<seastar::file_handle_impl> dup() override { return get_file_impl(_cf.get_file())->dup(); }
virtual future<temporary_buffer<uint8_t>> dma_read_bulk(uint64_t offset, size_t size, io_intent* intent) override {
return do_with(_cf.read(offset, std::nullopt, _trace_state, size), size, temporary_buffer<uint8_t>(),
[this, size] (cached_file::stream& s, size_t& size_left, temporary_buffer<uint8_t>& result) {
if (size_left == 0) {
return make_ready_future<temporary_buffer<uint8_t>>(std::move(result));
}
return repeat([this, &s, &size_left, &result, size] {
return s.next().then([this, &size_left, &result, size] (temporary_buffer<char> buf) {
if (!buf) {
throw seastar::file::eof_error();
}
if (!result) {
if (buf.size() >= size_left) {
result = temporary_buffer<uint8_t>(reinterpret_cast<uint8_t*>(buf.get_write()), size_left, buf.release());
return stop_iteration::yes;
}
result = temporary_buffer<uint8_t>::aligned(_memory_dma_alignment, size_left);
}
size_t this_len = std::min(buf.size(), size_left);
std::copy(buf.begin(), buf.begin() + this_len, result.get_write() + (size - size_left));
size_left -= this_len;
return stop_iteration(size_left == 0);
});
}).then([&] {
return std::move(result);
});
});
}
virtual future<size_t> read_dma(uint64_t pos, void* buffer, size_t len, io_intent*) override {
unsupported(); // FIXME
}
virtual future<size_t> read_dma(uint64_t pos, std::vector<iovec> iov, io_intent*) override {
unsupported(); // FIXME
}
};
// Creates a seastar::file object which will read through a given cached_file instance.
// The cached_file object must be kept alive as long as the file is in use.
inline
file make_cached_seastar_file(cached_file& cf) {
return file(make_shared<cached_file_impl>(cf));
}